Source code for pydeephaven._table_ops

#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Any, Union

import pyarrow as pa

from pydeephaven._arrow import map_arrow_type
from pydeephaven.agg import Aggregation
from pydeephaven.proto import table_pb2, table_pb2_grpc
from pydeephaven.updateby import UpdateByOperation


[docs]class SortDirection(Enum): """An enum defining the sort ordering.""" DESCENDING = table_pb2.SortDescriptor.SortDirection.DESCENDING """Descending sort direction""" ASCENDING = table_pb2.SortDescriptor.SortDirection.ASCENDING """Ascending sort direction"""
class TableOp(ABC): @classmethod @abstractmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: ... @abstractmethod def make_grpc_request(self, result_id, source_id) -> Any: ... @abstractmethod def make_grpc_request_for_batch(self, result_id, source_id) -> Any: ... class NoneOp(TableOp): @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: raise AssertionError("should never be called.") def make_grpc_request(self, result_id, source_id) -> Any: raise AssertionError("should never be called.") def make_grpc_request_for_batch(self, result_id, source_id) -> Any: raise AssertionError("should never be called.") def __init__(self, table): self.table = table class TimeTableOp(TableOp): def __init__(self, start_time: Union[int, str], period: Union[int, str], blink_table: bool = False): if start_time is None: # Force this to zero to trigger `now()` behavior. self.start_time = 0 else: self.start_time = start_time self.period = period self.blink_table = blink_table @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.TimeTable def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.TimeTableRequest(result_id=result_id, start_time_nanos=self.start_time if not isinstance(self.start_time, str) else None, start_time_string=self.start_time if isinstance(self.start_time, str) else None, period_nanos=self.period if not isinstance(self.period, str) else None, period_string=self.period if isinstance(self.period, str) else None, blink_table=self.blink_table) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( time_table=self.make_grpc_request(result_id=result_id, source_id=source_id)) class EmptyTableOp(TableOp): def __init__(self, size: int): self.size = size @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.EmptyTable def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.EmptyTableRequest(result_id=result_id, size=self.size) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( empty_table=self.make_grpc_request(result_id=result_id, source_id=source_id)) class DropColumnsOp(TableOp): def __init__(self, column_names: List[str]): self.column_names = column_names @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.DropColumns def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.DropColumnsRequest(result_id=result_id, source_id=source_id, column_names=self.column_names) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( drop_columns=self.make_grpc_request(result_id=result_id, source_id=source_id)) class USVOp(TableOp): def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.SelectOrUpdateRequest(result_id=result_id, source_id=source_id, column_specs=self.column_specs) class UpdateOp(USVOp): def __init__(self, column_specs: List[str]): self.column_specs = column_specs @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.Update def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( update=self.make_grpc_request(result_id=result_id, source_id=source_id)) class LazyUpdateOp(USVOp): def __init__(self, column_specs: List[str]): self.column_specs = column_specs @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.LazyUpdate def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( lazy_update=self.make_grpc_request(result_id=result_id, source_id=source_id)) class ViewOp(USVOp): def __init__(self, column_specs: List[str]): self.column_specs = column_specs @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.View def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( view=self.make_grpc_request(result_id=result_id, source_id=source_id)) class UpdateViewOp(USVOp): def __init__(self, column_specs: List[str]): self.column_specs = column_specs @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.UpdateView def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( update_view=self.make_grpc_request(result_id=result_id, source_id=source_id)) class SelectOp(USVOp): def __init__(self, column_specs: List[str]): self.column_specs = column_specs @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.Select def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( select=self.make_grpc_request(result_id=result_id, source_id=source_id)) class SelectDistinctOp(TableOp): def __init__(self, column_names: List[str]): self.column_names = column_names @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.SelectDistinct def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.SelectDistinctRequest(result_id=result_id, source_id=source_id, column_names=self.column_names) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( select_distinct=self.make_grpc_request(result_id=result_id, source_id=source_id)) class UnstructuredFilterOp(TableOp): def __init__(self, filters: List[str]): self.filters = filters @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.UnstructuredFilter def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.UnstructuredFilterTableRequest(result_id=result_id, source_id=source_id, filters=self.filters) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( unstructured_filter=self.make_grpc_request(result_id=result_id, source_id=source_id)) class SortOp(TableOp): def __init__(self, column_names: List[str], directions: List[SortDirection]): self.column_names = column_names self.directions = directions @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.Sort def make_grpc_request(self, result_id, source_id) -> Any: from itertools import zip_longest sort_specs = zip_longest(self.column_names, self.directions) sort_descriptors = [] for sp in sort_specs: if not sp[0]: break direction = sp[1] if sp[1] else SortDirection.ASCENDING sort_descriptor = table_pb2.SortDescriptor(column_name=sp[0], direction=direction.value) sort_descriptors.append(sort_descriptor) return table_pb2.SortTableRequest(result_id=result_id, source_id=source_id, sorts=sort_descriptors) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( sort=self.make_grpc_request(result_id=result_id, source_id=source_id)) class HeadOrTailOp(TableOp): def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.HeadOrTailRequest(result_id=result_id, source_id=source_id, num_rows=self.num_rows) class HeadOp(HeadOrTailOp): def __init__(self, num_rows: int): self.num_rows = num_rows @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.Head def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( head=self.make_grpc_request(result_id=result_id, source_id=source_id)) class TailOp(HeadOrTailOp): def __init__(self, num_rows: int): self.num_rows = num_rows @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.Tail def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( tail=self.make_grpc_request(result_id=result_id, source_id=source_id)) class HeadOrTailByOp(TableOp): def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.HeadOrTailByRequest(result_id=result_id, source_id=source_id, num_rows=self.num_rows, group_by_column_specs=self.column_names) class HeadByOp(HeadOrTailByOp): def __init__(self, num_rows: int, column_names: List[str]): self.num_rows = num_rows self.column_names = column_names @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.HeadBy def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( head_by=self.make_grpc_request(result_id=result_id, source_id=source_id)) class TailByOp(HeadOrTailByOp): def __init__(self, num_rows: int, column_names: List[str]): self.num_rows = num_rows self.column_names = column_names @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.TailBy def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( tail_by=self.make_grpc_request(result_id=result_id, source_id=source_id)) class UngroupOp(TableOp): def __init__(self, column_names: List[str], null_fill: bool = True): self.column_names = column_names self.null_fill = null_fill @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.Ungroup def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.UngroupRequest(result_id=result_id, source_id=source_id, null_fill=self.null_fill, columns_to_ungroup=self.column_names) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( ungroup=self.make_grpc_request(result_id=result_id, source_id=source_id)) class MergeTablesOp(TableOp): def __init__(self, tables: List[Any], key_column: str = ""): self.tables = tables self.key_column = key_column @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.MergeTables def make_grpc_request(self, result_id, source_id) -> Any: table_references = [] for tbl in self.tables: table_references.append(table_pb2.TableReference(ticket=tbl.ticket)) return table_pb2.MergeTablesRequest(result_id=result_id, source_ids=table_references, key_column=self.key_column) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( merge=self.make_grpc_request(result_id=result_id, source_id=source_id)) class NaturalJoinOp(TableOp): def __init__(self, table: Any, keys: List[str], columns_to_add: List[str]): self.table = table self.keys = keys self.columns_to_add = columns_to_add @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.NaturalJoinTables def make_grpc_request(self, result_id, source_id) -> Any: left_id = source_id right_id = table_pb2.TableReference(ticket=self.table.ticket) return table_pb2.NaturalJoinTablesRequest(result_id=result_id, left_id=left_id, right_id=right_id, columns_to_match=self.keys, columns_to_add=self.columns_to_add) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( natural_join=self.make_grpc_request(result_id=result_id, source_id=source_id)) class ExactJoinOp(TableOp): def __init__(self, table: Any, keys: List[str], columns_to_add: List[str]): self.table = table self.keys = keys self.columns_to_add = columns_to_add @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.ExactJoinTables def make_grpc_request(self, result_id, source_id) -> Any: left_id = source_id right_id = table_pb2.TableReference(ticket=self.table.ticket) return table_pb2.ExactJoinTablesRequest(result_id=result_id, left_id=left_id, right_id=right_id, columns_to_match=self.keys, columns_to_add=self.columns_to_add) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( exact_join=self.make_grpc_request(result_id=result_id, source_id=source_id)) class CrossJoinOp(TableOp): def __init__(self, table: Any, keys: List[str] = [], columns_to_add: List[str] = [], reserve_bits: int = 10): self.table = table self.keys = keys self.columns_to_add = columns_to_add self.reserve_bits = reserve_bits @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.CrossJoinTables def make_grpc_request(self, result_id, source_id) -> Any: left_id = source_id right_id = table_pb2.TableReference(ticket=self.table.ticket) return table_pb2.CrossJoinTablesRequest(result_id=result_id, left_id=left_id, right_id=right_id, columns_to_match=self.keys, columns_to_add=self.columns_to_add, reserve_bits=self.reserve_bits) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( cross_join=self.make_grpc_request(result_id=result_id, source_id=source_id)) class AjOp(TableOp): def __init__(self, table: Any, keys: List[str] = [], columns_to_add: List[str] = []): self.table = table self.keys = keys self.columns_to_add = columns_to_add @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.AjTables def make_grpc_request(self, result_id, source_id) -> Any: left_id = source_id right_id = table_pb2.TableReference(ticket=self.table.ticket) return table_pb2.AjRajTablesRequest(result_id=result_id, left_id=left_id, right_id=right_id, exact_match_columns=self.keys[:-1], as_of_column=self.keys[-1], columns_to_add=self.columns_to_add) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( aj=self.make_grpc_request(result_id=result_id, source_id=source_id)) class RajOp(TableOp): def __init__(self, table: Any, keys: List[str] = [], columns_to_add: List[str] = []): self.table = table self.keys = keys self.columns_to_add = columns_to_add @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.RajTables def make_grpc_request(self, result_id, source_id) -> Any: left_id = source_id right_id = table_pb2.TableReference(ticket=self.table.ticket) return table_pb2.AjRajTablesRequest(result_id=result_id, left_id=left_id, right_id=right_id, exact_match_columns=self.keys[:-1], as_of_column=self.keys[-1], columns_to_add=self.columns_to_add) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( raj=self.make_grpc_request(result_id=result_id, source_id=source_id)) class FlattenOp(TableOp): @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.Flatten def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.FlattenRequest(result_id=result_id, source_id=source_id) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( flatten=self.make_grpc_request(result_id=result_id, source_id=source_id)) class FetchTableOp(TableOp): @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.FetchTable def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.FetchTableRequest(result_id=result_id, source_id=source_id) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( fetch_table=self.make_grpc_request(result_id=result_id, source_id=source_id)) class UpdateByOp(TableOp): def __init__(self, operations: List[UpdateByOperation], by: List[str]): self.operations = operations self.by = by @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.UpdateBy def make_grpc_request(self, result_id, source_id) -> Any: operations = [op.make_grpc_message() for op in self.operations] return table_pb2.UpdateByRequest(result_id=result_id, source_id=source_id, operations=operations, group_by_columns=self.by) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( update_by=self.make_grpc_request(result_id=result_id, source_id=source_id)) class SnapshotTableOp(TableOp): @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.Snapshot def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.SnapshotTableRequest(result_id=result_id, source_id=source_id) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( snapshot=self.make_grpc_request(result_id=result_id, source_id=source_id)) class SnapshotWhenTableOp(TableOp): def __init__(self, trigger_table: Any, stamp_cols: List[str] = None, initial: bool = False, incremental: bool = False, history: bool = False): self.trigger_table = trigger_table self.stamp_cols = stamp_cols self.initial = initial self.incremental = incremental self.history = history @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.SnapshotWhen def make_grpc_request(self, result_id, source_id) -> Any: base_id = source_id trigger_id = table_pb2.TableReference(ticket=self.trigger_table.ticket) return table_pb2.SnapshotWhenTableRequest(result_id=result_id, base_id=base_id, trigger_id=trigger_id, initial=self.initial, incremental=self.incremental, history=self.history, stamp_columns=self.stamp_cols) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( snapshot_when=self.make_grpc_request(result_id=result_id, source_id=source_id)) class AggregateOp(TableOp): def __init__(self, aggs: List[Aggregation], by: List[str]): self.aggs = aggs self.by = by @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.Aggregate def make_grpc_request(self, result_id, source_id) -> Any: aggregations = [agg.make_grpc_message() for agg in self.aggs] return table_pb2.AggregateRequest(result_id=result_id, source_id=source_id, aggregations=aggregations, group_by_columns=self.by) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( aggregate=self.make_grpc_request(result_id=result_id, source_id=source_id)) class AggregateAllOp(TableOp): def __init__(self, agg: Aggregation, by: List[str]): self.agg_spec = agg.agg_spec self.by = by @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.AggregateAll def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.AggregateAllRequest(result_id=result_id, source_id=source_id, spec=self.agg_spec, group_by_columns=self.by) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( aggregate_all=self.make_grpc_request(result_id=result_id, source_id=source_id)) class CreateInputTableOp(TableOp): def __init__(self, schema: pa.schema, init_table: Any, key_cols: List[str] = None, blink: bool = False): if blink and key_cols: raise ValueError("key columns are not supported for blink input tables.") self.schema = schema self.init_table = init_table self.key_cols = key_cols self.blink = blink @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.CreateInputTable def make_grpc_request(self, result_id, source_id) -> Any: if self.blink: blink_ = table_pb2.CreateInputTableRequest.InputTableKind.Blink() input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(blink=blink_) elif self.key_cols: key_backed = table_pb2.CreateInputTableRequest.InputTableKind.InMemoryKeyBacked( key_columns=self.key_cols) input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(in_memory_key_backed=key_backed) else: append_only = table_pb2.CreateInputTableRequest.InputTableKind.InMemoryAppendOnly() input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(in_memory_append_only=append_only) if self.schema: dh_fields = [] for f in self.schema: dh_fields.append(pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type))) dh_schema = pa.schema(dh_fields) schema = dh_schema.serialize().to_pybytes() return table_pb2.CreateInputTableRequest(result_id=result_id, schema=schema, kind=input_table_kind) else: source_table_id = table_pb2.TableReference(ticket=self.init_table.ticket) return table_pb2.CreateInputTableRequest(result_id=result_id, source_table_id=source_table_id, kind=input_table_kind) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( create_input_table=self.make_grpc_request(result_id=result_id, source_id=source_id)) class WhereInTableOp(TableOp): def __init__(self, filter_table: Any, cols: List[str], inverted: bool): self.filter_table = filter_table self.cols = cols self.inverted = inverted @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.WhereIn def make_grpc_request(self, result_id, source_id) -> Any: right_id = table_pb2.TableReference(ticket=self.filter_table.ticket) return table_pb2.WhereInRequest(result_id=result_id, left_id=source_id, right_id=right_id, inverted=self.inverted, columns_to_match=self.cols) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( where_in=self.make_grpc_request(result_id=result_id, source_id=source_id)) class MetaTableOp(TableOp): @classmethod def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any: return table_service_stub.MetaTable def make_grpc_request(self, result_id, source_id) -> Any: return table_pb2.MetaTableRequest(result_id=result_id, source_id=source_id) def make_grpc_request_for_batch(self, result_id, source_id) -> Any: return table_pb2.BatchTableRequest.Operation( meta_table=self.make_grpc_request(result_id=result_id, source_id=source_id))