#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
""" This module supports the conversion between Deephaven tables and pandas DataFrames. """
from typing import List, Literal
import jpy
import numpy as np
import pandas as pd
import pyarrow as pa
from deephaven import DHError, new_table, dtypes, arrow
from deephaven.column import ColumnDefinition
from deephaven.constants import NULL_BYTE, NULL_SHORT, NULL_INT, NULL_LONG, NULL_FLOAT, NULL_DOUBLE, NULL_CHAR
from deephaven.jcompat import _j_array_to_series
from deephaven.numpy import _make_input_column
from deephaven.table import Table
_NULL_BOOLEAN_AS_BYTE = jpy.get_type("io.deephaven.util.BooleanUtils").NULL_BOOLEAN_AS_BYTE
_JColumnVectors = jpy.get_type("io.deephaven.engine.table.vectors.ColumnVectors")
_is_dtype_backend_supported = pd.__version__ >= "2.0.0"
def _column_to_series(table: Table, col_def: ColumnDefinition, conv_null: bool) -> pd.Series:
"""Produce a copy of the specified column as a pandas.Series object.
Args:
table (Table): the table
col_def (ColumnDefinition): the column definition
conv_null (bool): whether to check for Deephaven nulls in the data and automatically replace them with
pd.NA.
Returns:
a pandas Series
Raises:
DHError
"""
try:
j_array = _JColumnVectors.of(table.j_table, col_def.name).copyToArray()
return _j_array_to_series(col_def.data_type, j_array, conv_null)
except DHError:
raise
except Exception as e:
raise DHError(e, message="failed to create a pandas Series for {col}") from e
_PANDAS_ARROW_TYPE_MAP = {
pa.int8(): pd.ArrowDtype(pa.int8()),
pa.int16(): pd.ArrowDtype(pa.int16()),
pa.int32(): pd.ArrowDtype(pa.int32()),
pa.int64(): pd.ArrowDtype(pa.int64()),
pa.uint8(): pd.ArrowDtype(pa.uint8()),
pa.uint16(): pd.ArrowDtype(pa.uint16()),
pa.uint32(): pd.ArrowDtype(pa.uint32()),
pa.uint64(): pd.ArrowDtype(pa.uint64()),
pa.bool_(): pd.ArrowDtype(pa.bool_()),
pa.float32(): pd.ArrowDtype(pa.float32()),
pa.float64(): pd.ArrowDtype(pa.float64()),
pa.string(): pd.ArrowDtype(pa.string()),
pa.timestamp('ns'): pd.ArrowDtype(pa.timestamp('ns')),
pa.timestamp('ns', tz='UTC'): pd.ArrowDtype(pa.timestamp('ns', tz='UTC')),
}
_PANDAS_NULLABLE_TYPE_MAP = {
pa.int8(): pd.Int8Dtype(),
pa.int16(): pd.Int16Dtype(),
pa.uint16(): pd.UInt16Dtype(),
pa.int32(): pd.Int32Dtype(),
pa.int64(): pd.Int64Dtype(),
pa.bool_(): pd.BooleanDtype(),
pa.float32(): pd.Float32Dtype(),
pa.float64(): pd.Float64Dtype(),
pa.string(): pd.StringDtype(),
# pa.Table.to_pandas() doesn't like explicit mapping to pd.DatetimeTZDtype, however it, on its own,
# can correctly map pyarrow timestamp to DatetimeTZDtype and convert null values to NaT
# pa.timestamp('ns'): pd.DatetimeTZDtype(unit='ns', tz='UTC'),
# pa.timestamp('ns', tz='UTC'): pd.DatetimeTZDtype(unit='ns', tz='UTC'),
}
_PYARROW_TO_PANDAS_TYPE_MAPPERS = {
"pyarrow": _PANDAS_ARROW_TYPE_MAP.get,
"numpy_nullable": _PANDAS_NULLABLE_TYPE_MAP.get,
}
[docs]def to_pandas(table: Table, cols: List[str] = None,
dtype_backend: Literal[None, "pyarrow", "numpy_nullable"] = "numpy_nullable",
conv_null: bool = True) -> pd.DataFrame:
"""Produces a pandas DataFrame from a table.
Note that the **entire table** is going to be cloned into memory, so the total number of entries in the table
should be considered before blindly doing this. For large tables, consider using the Deephaven query language to
select a subset of the table **before** using this method.
Args:
table (Table): the source table
cols (List[str]): the source column names, default is None which means include all columns
dtype_backend (str): which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
nullable dtypes are used for all dtypes that have a nullable implementation when “numpy_nullable” is set,
pyarrow is used for all dtypes if “pyarrow” is set. None means Numpy backed DataFrames with no nullable
dtypes. Both "numpy_nullable" and "pyarrow" automatically convert Deephaven nulls to Pandas NA and enable
Pandas extension types. Extension types are needed to support types beyond NumPy's type system. Extension
types support operations such as properly mapping Java Strings to Python strings. default is "numpy_nullable".
conv_null (bool): when dtype_backend is set to None, whether to check for Deephaven nulls in the data and
automatically replace them with pd.NA. default is True.
Returns:
a pandas DataFrame
Raise:
DHError
"""
try:
if dtype_backend == "pyarrow" and not _is_dtype_backend_supported:
raise DHError(message=f"the dtype_backend ({dtype_backend}) option is only available for pandas 2.0.0 and "
f"above. {pd.__version__} is being used.")
if dtype_backend is not None and not conv_null:
raise DHError(message="conv_null can't be turned off when dtype_backend is either numpy_nullable or "
"pyarrow")
# if nullable dtypes (pandas or pyarrow) is requested
if type_mapper := _PYARROW_TO_PANDAS_TYPE_MAPPERS.get(dtype_backend):
pa_table = arrow.to_arrow(table=table, cols=cols)
df = pa_table.to_pandas(types_mapper=type_mapper)
del pa_table
return df
# if regular numpy dtype is requested, direct access of the table column sources is required. In order to get a
# consistent view of a ticking table, we need to take a snapshot of it first.
if table.is_refreshing:
table = table.snapshot()
table_def = table.definition
if not cols:
cols = list(table_def.keys())
else:
diff_set = set(cols) - set(table_def.keys())
if diff_set:
raise DHError(message=f"columns - {list(diff_set)} not found")
data = {}
for col in cols:
series = _column_to_series(table, table_def[col], conv_null)
data[col] = series
return pd.DataFrame(data=data, columns=cols, copy=False)
except DHError:
raise
except Exception as e:
raise DHError(e, "failed to create a pandas DataFrame from table.") from e
_PANDAS_EXTYPE_DH_NULL_MAP = {
# This reflects the fact that in the server we use NULL_BOOLEAN_AS_BYTE - the byte encoding of null boolean to
# translate boxed Boolean to/from primitive bytes
pd.BooleanDtype: _NULL_BOOLEAN_AS_BYTE,
pd.Int8Dtype: NULL_BYTE,
pd.Int16Dtype: NULL_SHORT,
pd.UInt16Dtype: NULL_CHAR,
pd.Int32Dtype: NULL_INT,
pd.Int64Dtype: NULL_LONG,
pd.Float32Dtype: NULL_FLOAT,
pd.Float64Dtype: NULL_DOUBLE,
pd.StringDtype: None,
pd.ArrowDtype(pa.int8()): NULL_BYTE,
pd.ArrowDtype(pa.int16()): NULL_SHORT,
pd.ArrowDtype(pa.int32()): NULL_INT,
pd.ArrowDtype(pa.int64()): NULL_LONG,
pd.ArrowDtype(pa.bool_()): NULL_BYTE,
pd.ArrowDtype(pa.float32()): NULL_FLOAT,
pd.ArrowDtype(pa.float64()): NULL_DOUBLE,
pd.ArrowDtype(pa.string()): None,
}
def _map_na(array: [np.ndarray, pd.api.extensions.ExtensionArray]):
"""Replaces the pd.NA values in the array if it is of pandas ExtensionDtype(nullable)."""
pd_dtype = array.dtype
if not isinstance(pd_dtype, pd.api.extensions.ExtensionDtype):
return array
dh_null = _PANDAS_EXTYPE_DH_NULL_MAP.get(type(pd_dtype)) or _PANDAS_EXTYPE_DH_NULL_MAP.get(pd_dtype)
# To preserve NaNs in floating point arrays, Pandas doesn't distinguish NaN/Null as far as NA testing is
# concerned, thus its fillna() method will replace both NaN/Null in the data.
if isinstance(pd_dtype, (pd.Float32Dtype, pd.Float64Dtype)) and isinstance(getattr(array, "_data"), np.ndarray):
np_array = array._data
null_mask = np.logical_and(array._mask, np.logical_not(np.isnan(np_array)))
if any(null_mask):
np_array = np.copy(np_array)
np_array[null_mask] = dh_null
return np_array
if isinstance(pd_dtype, (pd.StringDtype, pd.BooleanDtype)) or pd_dtype == pd.ArrowDtype(pa.bool_()):
array = np.array(list(map(lambda v: dh_null if v is pd.NA else v, array)))
elif dh_null is not None:
array = array.fillna(dh_null)
return array
[docs]def to_table(df: pd.DataFrame, cols: List[str] = None, infer_objects: bool = True) -> Table:
"""Creates a new table from a pandas DataFrame.
Args:
df (DataFrame): the pandas DataFrame instance
cols (List[str]): the dataframe column names, default is None which means including all columns in the DataFrame
infer_objects (bool): whether to infer the best possible types for columns of the generic 'object' type in the
DataFrame before creating the table, default is True. When True, pandas convert_dtypes() method is called to
perform the conversion. Note that any conversion will make a copy of the data.
Returns:
a Deephaven table
Raise:
DHError
"""
if not cols:
cols = list(df)
else:
diff_set = set(cols) - set(list(df))
if diff_set:
raise DHError(message=f"columns - {list(diff_set)} not found")
# if infer_objects is True, convert object dtypes to the best possible types supporting pd.NA
converted_df = df
if infer_objects:
converted_df = df[cols]
for col in cols:
if df.dtypes[col] == object:
converted_df[col] = df[col].convert_dtypes()
# if any arrow backed column is present, create a pyarrow table first, then upload to DH, if error occurs, fall
# back to the numpy-array based approach
if _is_dtype_backend_supported and any(isinstance(converted_df[col].dtype, pd.ArrowDtype) for col in cols):
try:
pa_table = pa.Table.from_pandas(df=converted_df, columns=cols)
dh_table = arrow.to_table(pa_table)
return dh_table
except:
pass
try:
input_cols = []
for col in cols:
np_array = converted_df.get(col).values
if isinstance(converted_df.dtypes[col], pd.CategoricalDtype):
dtype = converted_df.dtypes[col].categories.dtype
else:
dtype = np_array.dtype
dh_dtype = dtypes.from_np_dtype(dtype)
np_array = _map_na(np_array)
input_cols.append(_make_input_column(col, np_array, dh_dtype))
return new_table(cols=input_cols)
except DHError:
raise
except Exception as e:
raise DHError(e, "failed to create a Deephaven Table from a pandas DataFrame.") from e