Source code for deephaven.arrow

#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module supports conversions between pyarrow tables and Deephaven tables."""

from typing import List, Dict

import jpy
import pyarrow as pa

from deephaven import DHError, dtypes
from deephaven.table import Table

_JArrowToTableConverter = jpy.get_type("io.deephaven.extensions.barrage.util.ArrowToTableConverter")
_JTableToArrowConverter = jpy.get_type("io.deephaven.extensions.barrage.util.TableToArrowConverter")
_JArrowWrapperTools = jpy.get_type("io.deephaven.extensions.arrow.ArrowWrapperTools")

_ARROW_DH_DATA_TYPE_MAPPING = {
    pa.null(): '',
    pa.bool_(): 'java.lang.Boolean',
    pa.int8(): 'byte',
    pa.int16(): 'short',
    pa.int32(): 'int',
    pa.int64(): 'long',
    pa.uint8(): '',
    pa.uint16(): 'char',
    pa.uint32(): '',
    pa.uint64(): '',
    pa.float16(): '',
    pa.float32(): 'float',
    pa.float64(): 'double',
    pa.time32('s'): '',
    pa.time32('ms'): '',
    pa.time64('us'): '',
    pa.time64('ns'): 'java.time.LocalTime',
    pa.timestamp('s'): '',
    pa.timestamp('ms'): '',
    pa.timestamp('us'): '',
    pa.timestamp('ns'): 'java.time.Instant',
    pa.date32(): '',
    pa.date64(): 'java.time.LocalDate',
    pa.duration('s'): '',
    pa.duration('ms'): '',
    pa.duration('us'): '',
    pa.duration('ns'): '',
    pa.month_day_nano_interval(): '',
    pa.binary(): '',
    pa.string(): 'java.lang.String',
    pa.utf8(): 'java.lang.String',
    pa.large_binary(): '',
    pa.large_string(): '',
    pa.large_utf8(): '',
    # decimal128(int precision, int scale=0)
    # list_(value_type, int list_size=-1)
    # large_list(value_type)
    # map_(key_type, item_type[, keys_sorted])
    # struct(fields)
    # dictionary(index_type, value_type, …)
}

SUPPORTED_ARROW_TYPES = [k for k, v in _ARROW_DH_DATA_TYPE_MAPPING.items() if v]


def _map_arrow_type(arrow_type) -> Dict[str, str]:
    """Maps a pyarrow type to the corresponding Deephaven column data type."""
    dh_type = _ARROW_DH_DATA_TYPE_MAPPING.get(arrow_type)
    if not dh_type:
        # if this is a case of timestamp with tz specified
        if isinstance(arrow_type, pa.TimestampType):
            dh_type = "java.time.Instant"

    if not dh_type:
        raise DHError(message=f'unsupported arrow data type : {arrow_type}, refer to '
                              f'deephaven.arrow.SUPPORTED_ARROW_TYPES for the list of supported pyarrow types.')

    return {"deephaven:type": dh_type}


[docs]def to_table(pa_table: pa.Table, cols: List[str] = None) -> Table: """Creates a Deephaven table from a pyarrow table. Args: pa_table(pa.Table): the pyarrow table cols (List[str]): the pyarrow table column names, default is None which means including all columns Returns: a new table Raises: DHError """ if cols: pa_table = pa_table.select(cols) j_barrage_table_builder = _JArrowToTableConverter() dh_fields = [] for f in pa_table.schema: dh_fields.append(pa.field(name=f.name, type=f.type, metadata=_map_arrow_type(f.type))) dh_schema = pa.schema(dh_fields) try: j_barrage_table_builder.setSchema(jpy.byte_buffer(dh_schema.serialize())) record_batches = pa_table.to_batches() j_barrage_table_builder.addRecordBatches([jpy.byte_buffer(rb.serialize()) for rb in record_batches]) j_barrage_table_builder.onCompleted() return Table(j_table=j_barrage_table_builder.getResultTable()) except Exception as e: raise DHError(e, message="failed to create a Deephaven table from a pyarrow table.") from e
[docs]def to_arrow(table: Table, cols: List[str] = None) -> pa.Table: """Produces a pyarrow table from a Deephaven table Args: table (Table): the source table cols (List[str]): the source column names, default is None which means including all columns Returns: a pyarrow table Raise: DHError """ try: if cols: table = table.view(formulas=cols) j_arrow_builder = _JTableToArrowConverter(table.j_table); pa_schema_buffer = j_arrow_builder.getSchema() with pa.ipc.open_stream(pa.py_buffer(pa_schema_buffer)) as reader: schema = reader.schema record_batches = [] while j_arrow_builder.hasNext(): pa_rb_buffer = j_arrow_builder.next() message = pa.ipc.read_message(pa_rb_buffer) record_batch = pa.ipc.read_record_batch(message, schema=schema) record_batches.append(record_batch) return pa.Table.from_batches(record_batches, schema=schema) except Exception as e: raise DHError(e, message="failed to create a pyarrow table from a Deephaven table.") from e
[docs]def read_feather(path: str) -> Table: """Reads an Arrow feather file into a Deephaven table. Args: path (str): the file path Returns: a new table Raises: DHError """ try: return Table(j_table=_JArrowWrapperTools.readFeather(path)) except Exception as e: raise DHError(e, message=f"failed to read a feather file {path}") from e