Source code for deephaven.arrow

#
# Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
#
"""This module supports conversions between pyarrow tables and Deephaven tables."""

from collections.abc import Sequence
from typing import Optional

import jpy
import pyarrow as pa

from deephaven import DHError
from deephaven.table import Table

_JArrowToTableConverter = jpy.get_type(
    "io.deephaven.extensions.barrage.util.ArrowToTableConverter"
)
_JTableToArrowConverter = jpy.get_type(
    "io.deephaven.extensions.barrage.util.TableToArrowConverter"
)
_JArrowWrapperTools = jpy.get_type("io.deephaven.extensions.arrow.ArrowWrapperTools")

_ARROW_DH_DATA_TYPE_MAPPING = {
    pa.null(): "java.lang.Object",
    pa.bool_(): "java.lang.Boolean",
    pa.int8(): "byte",
    pa.int16(): "short",
    pa.int32(): "int",
    pa.int64(): "long",
    pa.uint8(): "short",
    pa.uint16(): "char",
    pa.uint32(): "long",
    pa.uint64(): "java.math.BigInteger",
    pa.float16(): "float",
    pa.float32(): "float",
    pa.float64(): "double",
    pa.time32("s"): "java.time.LocalTime",
    pa.time32("ms"): "java.time.LocalTime",
    pa.time64("us"): "java.time.LocalTime",
    pa.time64("ns"): "java.time.LocalTime",
    pa.timestamp("s"): "java.time.Instant",
    pa.timestamp("ms"): "java.time.Instant",
    pa.timestamp("us"): "java.time.Instant",
    pa.timestamp("ns"): "java.time.Instant",
    pa.date32(): "java.time.LocalDate",
    pa.date64(): "java.time.LocalDate",
    pa.duration("s"): "java.time.Duration",
    pa.duration("ms"): "java.time.Duration",
    pa.duration("us"): "java.time.Duration",
    pa.duration("ns"): "java.time.Duration",
    pa.month_day_nano_interval(): "org.apache.arrow.vector.PeriodDuration",
    pa.binary(): "byte[]",
    pa.string(): "java.lang.String",
    pa.utf8(): "java.lang.String",
    pa.large_binary(): "",
    pa.large_string(): "",
    pa.large_utf8(): "",
    # decimal128(int precision, int scale=0)
    # list_(value_type, int list_size=-1)
    # large_list(value_type)
    # map_(key_type, item_type[, keys_sorted])
    # struct(fields)
    # dictionary(index_type, value_type, …)
}

_SUPPORTED_ARROW_PARAMETERIZABLE_TYPES = {
    pa.TimestampType,
    pa.FixedSizeBinaryType,
    pa.Decimal128Type,
    pa.Decimal256Type,
    pa.ListType,
    pa.ListViewType,
    pa.FixedSizeListType,
    pa.MapType,
    pa.DenseUnionType,
    pa.SparseUnionType,
}

SUPPORTED_ARROW_TYPES = [k for k, v in _ARROW_DH_DATA_TYPE_MAPPING.items() if v]


def _map_arrow_type(arrow_type: pa.DataType) -> dict[str, str]:
    """Maps a pyarrow type to the corresponding Deephaven column data type."""
    dh_type = _ARROW_DH_DATA_TYPE_MAPPING.get(arrow_type)

    if not dh_type:
        if type(arrow_type) in _SUPPORTED_ARROW_PARAMETERIZABLE_TYPES:
            # For parameterizable types let the server handle the mapping
            return {}

        raise DHError(
            message=f"unsupported arrow data type : {arrow_type}, refer to "
            f"deephaven.arrow.SUPPORTED_ARROW_TYPES for the list of supported pyarrow types."
        )

    return {"deephaven:type": dh_type}


[docs] def to_table(pa_table: pa.Table, cols: Optional[Sequence[str]] = None) -> Table: """Creates a Deephaven table from a pyarrow table. Args: pa_table(pa.Table): the pyarrow table cols (Optional[Sequence[str]]): the pyarrow table column names, default is None which means including all columns Returns: a new table Raises: DHError """ if cols: pa_table = pa_table.select(cols) j_barrage_table_builder = _JArrowToTableConverter() dh_fields = [] for f in pa_table.schema: dh_fields.append( pa.field(name=f.name, type=f.type, metadata=_map_arrow_type(f.type)) ) dh_schema = pa.schema(dh_fields) try: j_barrage_table_builder.setSchema(jpy.byte_buffer(dh_schema.serialize())) record_batches = pa_table.to_batches() j_barrage_table_builder.addRecordBatches( [jpy.byte_buffer(rb.serialize()) for rb in record_batches] ) j_barrage_table_builder.onCompleted() return Table(j_table=j_barrage_table_builder.getResultTable()) except Exception as e: raise DHError( e, message="failed to create a Deephaven table from a pyarrow table." ) from e
[docs] def to_arrow(table: Table, cols: Optional[Sequence[str]] = None) -> pa.Table: """Creates a pyarrow table from a Deephaven table. Args: table (Table): the Deephaven table cols (Optional[Sequence[str]]): the table column names, default is None which means including all columns Returns: a pyarrow table Raise: DHError """ try: if cols: table = table.view(formulas=cols) j_arrow_builder = _JTableToArrowConverter(table.j_table) pa_schema_buffer = j_arrow_builder.getSchema() with pa.ipc.open_stream(pa.py_buffer(pa_schema_buffer)) as reader: schema = reader.schema record_batches = [] while j_arrow_builder.hasNext(): pa_rb_buffer = j_arrow_builder.next() message = pa.ipc.read_message(pa_rb_buffer) record_batch = pa.ipc.read_record_batch(message, schema=schema) record_batches.append(record_batch) return pa.Table.from_batches(record_batches, schema=schema) except Exception as e: raise DHError( e, message="failed to create a pyarrow table from a Deephaven table." ) from e
[docs] def read_feather(path: str) -> Table: """Reads an Arrow feather file into a Deephaven table. Args: path (str): the file path Returns: a new table Raises: DHError """ try: return Table(j_table=_JArrowWrapperTools.readFeather(path)) except Exception as e: raise DHError(e, message=f"failed to read a feather file {path}") from e