Table data service
This guide covers using the Table Data Service API to integrate custom data services into Deephaven workflows.
To use the Table Data Service API, you must create an implementation of its backend, which will be used to construct the Table Data Service itself. The API provides a series of classes with abstract methods that can be used as a template in a custom implementation. This guide covers all of the necessary components of the backend, along with an example implementation.
This feature is currently experimental. The API and its characteristics are subject to change.
TableDataService
API
The Python TableDataService
API simplifies how the engine thinks about data being served by a remote service or database. It provides a way for users to consume data from external sources into Deephaven in the format of PyArrow tables, which can back Deephaven tables in both static and refreshing contexts.
Each class in the API has some requirements when creating an implementation. The following sections describe the requirements for each class and give an example.
TableKey
A TableKey
is a unique identifier for a table. An implementation must have the following methods:
__hash__
: Create a hash of the key.__eq__
: Compare two keys for equality. IfTableKey1 == TableKey2
is true, thenhash(TableKey1) == hash(TableKey2)
must also be true.
The following code block contains an example implementation of a TableKey
, aptly named TableKeyImpl
. It implements both of the required methods, along with a __str__
method for key validation.
from deephaven.experimental.table_data_service import TableKey
class TableKeyImpl(TableKey):
"""
A basic implementation of a TableKey.
"""
def __init__(self, key: str):
self.key = key
def __hash__(self):
return hash(self.key)
def __eq__(self, other):
if not isinstance(other, TableKeyImpl):
return NotImplemented
return self.key == other.key
def __str__(self):
return f"TableKeyImpl{{{self.key}}}"
TableLocationKey
A TableLocationKey
is a unique identifier for a location within a table. An implementation has the same methods required as a TableKey
implementation:
__hash__
: Create a hash of the key.__eq__
: Compare two keys for equality. IfTableLocationKey1 == TableLocationKey2
is true, thenhash(TableLocationKey1) == hash(TableLocationKey2)
must also be true.
The following code block contains an example implementation of a TableLocationKey
, aptly named TableLocationKeyImpl
. It implements both of the required methods, along with a __str__
method for key validation.
from deephaven.experimental.table_data_service import TableLocationKey
class TableLocationKeyImpl(TableLocationKey):
"""
A basic implementation of a TableLocationKey.
"""
def __init__(self, key: str):
self.key = key
def __hash__(self):
return hash(self.key)
def __eq__(self, other):
if not isinstance(other, TableLocationKeyImpl):
return NotImplemented
return self.key == other.key
def __str__(self):
return f"TableLocationKeyImpl{{{self.key}}}"
TableDataServiceBackend
A TableDataServiceBackend
is the interface that must be implemented to provide data to the TableDataService
. It has a series of abstract methods that must be implemented to provide the necessary information to the TableDataService
. The methods are:
table_schema
: Fetch the schema of a table.table_locations
: Fetch the locations of a table.table_location_size
: Fetch the size of a table location.column_values
: Fetch column values.susbcribe_to_table_locations
: Subscribe to existing and future table locations.subscribe_to_table_location_size
: Subscribe to existing and future table location sizes.
The following code block contains an example implementation of a TableDataServiceBackend
, named TestBackend
. It implements all six required methods, along with some additional methods to add tables and locations to the backend. It requires a custom table implementation, given in an expandable code block.
A `TableDataServiceBackend` implementation
from deephaven.experimental.table_data_service import (
TableDataServiceBackend,
TableKey,
TableLocationKey,
)
class TestBackend(TableDataServiceBackend):
"""
A custom implementation of the TableDataServiceBackend for testing.
"""
def __init__(self):
self.tables: Dict[TableKey, TestTable] = {}
def add_table(self, table_key: TableKeyImpl, table: TestTable):
"""
Add a new table to the backend.
"""
if table_key in self.tables:
raise ValueError(f"{table_key} already exists")
self.tables[table_key] = table
def table_schema(
self,
table_key: TableKeyImpl,
schema_cb: Callable[[pa.Schema, Optional[pa.Schema]], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the schema of a table with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
schema_cb(table.data_schema, table.partitioning_column_schema)
def table_locations(
self,
table_key: TableKeyImpl,
location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the locations of a table with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
for key, location in self.tables[table_key].locations:
location_cb([key, location.partitioning_values])
success_cb()
def table_location_size(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
size_cb: Callable[[int], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the size of a table location with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return
size_cb(table.locations[table_location_key].data.num_rows)
def column_values(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
col: str,
offset: int,
min_rows: int,
max_rows: int,
values_cb: Callable[[pa.Table], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch column values with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return
location = table.locations[table_location_key]
values_cb(location.data.select([col]).slice(offset, min_rows))
def subscribe_to_table_locations(
self,
table_key: TableKeyImpl,
location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> Callable[[], None]:
"""
Subscribe to table locations with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return lambda *x: x
table = self.tables[table_key]
table.location_cb = location_cb
table.location_failure_cb = failure_cb
# send all existing locations straight away
for key, location in table.locations.items():
location_cb(key, location.partitioning_values)
success_cb()
def unsubscribe():
table.location_cb = lambda *x: x
table.location_failure_cb = lambda *x: x
return unsubscribe
def subscribe_to_table_location_size(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
size_cb: Callable[[int], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> Callable[[], None]:
"""
Subscribe to table location size with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return lambda *x: x
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return lambda *x: x
location = table.locations[table_location_key]
location.size_cb = size_cb
location.failure_cb = failure_cb
# send existing size
size_cb(location.data.num_rows)
success_cb()
def unsubscribe():
location.size_cb = lambda *x: x
location.failure_cb = lambda *x: x
return unsubscribe
A custom table implementation for the backend
from typing import Optional, Dict, Callable
import pyarrow as pa
class TestTable:
"""
A custom table implementation for the backend.
"""
class TestTableLocation:
"""
A custom table location implementation for the backend.
"""
def __init__(
self, data_schema: pa.Schema, partitioning_values: Optional[pa.Table]
):
self.partitioning_values = partitioning_values
self.size_cb: Callable[[int], None] = lambda *x: x
self.size_failure_cb: Callable[[], None] = lambda *x: x
self.data: pa.Table = data_schema.empty_table()
def append_data(self, new_data: pa.Table):
"""
Append data to the table in batches.
"""
rbs = self.data.to_batches()
for batch in new_data.to_batches():
rbs.append(batch)
self.data = pa.Table.from_batches(rbs)
self.size_cb(self.data.num_rows)
def __init__(
self, data_schema: pa.Schema, partitioning_column_schema: Optional[pa.Schema]
):
self.data_schema = data_schema
self.partitioning_column_schema = partitioning_column_schema
self.locations: Dict[TableLocationKey, self.TestTableLocation] = {}
self.location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None] = (
lambda *x: x
)
self.location_failure_cb: Callable[[str], None] = lambda *x: x
def add_table_location(
self,
table_location_key: TableLocationKeyImpl,
partitioning_column_values: Optional[pa.Table],
data_values: pa.Table,
):
"""
Add a new location to the table.
"""
if table_location_key in self.locations:
raise ValueError(
f"Cannot add table location {table_location_key} already exists"
)
new_location = self.TestTableLocation(
self.data_schema, partitioning_column_values
)
new_location.append_data(data_values)
self.locations[table_location_key] = new_location
def append_table_location(
self, table_location_key: TableLocationKeyImpl, data_values: pa.Table
):
"""
Append data to an existing location in the table.
"""
if table_location_key not in self.locations:
raise ValueError(
f"Cannot append to non-existent table location {table_location_key}"
)
self.locations[table_location_key].append_data(data_values)
TableDataService
The TableDataService
is the main class that users interact with. It is instantiated using the TableDataServiceBackend
implementation, along with other optional parameters.
The TableDataService
class has a method, make_table
, which creates a Deephaven table backed by the data service backend. It is given a unique TableKey
and is told whether the table is static or refreshing. The usage
section of the example below contains an example of the use of TableDataService
that leverages the example backend given above.
Example
The example presented in this guide uses the code presented in the sections above to demonstrate how to use the TableDataService
API to fetch a table in both static and refreshing contexts. The example is split into two parts, each in its own subsection.
Backend data service implementation
The following code block provides a backend implementation for a data service. It shows how to:
- Implement a sample
TableDataServiceBackend
. - Manually manipulate the
TableDataServiceBackend
to demonstrate the behavior of static and refreshing scenarios. - Create a custom table implementation and add it to the backend.
- Fetch a table from that backend in both static and refreshing contexts.
A backend data service implementation
from deephaven.experimental.table_data_service import (
TableDataServiceBackend,
TableKey,
TableLocationKey,
TableDataService,
)
from typing import Callable, Optional, Dict
import pyarrow as pa
class TableKeyImpl(TableKey):
"""
A simple implementation of a TableKey.
"""
def __init__(self, key: str):
self.key = key
def __hash__(self):
return hash(self.key)
def __eq__(self, other):
if not isinstance(other, TableKeyImpl):
return NotImplemented
return self.key == other.key
def __str__(self):
return f"TableKeyImpl{{{self.key}}}"
class TableLocationKeyImpl(TableLocationKey):
"""
A simple implementation of a TableLocationKey.
"""
def __init__(self, key: str):
self.key = key
def __hash__(self):
return hash(self.key)
def __eq__(self, other):
if not isinstance(other, TableLocationKeyImpl):
return NotImplemented
return self.key == other.key
def __str__(self):
return f"TableLocationKeyImpl{{{self.key}}}"
class TestTable:
"""
A custom table implementation for the backend.
"""
class TestTableLocation:
"""
A custom table location implementation for the backend.
"""
def __init__(
self, data_schema: pa.Schema, partitioning_values: Optional[pa.Table]
):
self.partitioning_values = partitioning_values
self.size_cb: Callable[[int], None] = lambda *x: x
self.size_failure_cb: Callable[[], None] = lambda *x: x
self.data: pa.Table = data_schema.empty_table()
def append_data(self, new_data: pa.Table):
"""
Append data to the table in batches.
"""
rbs = self.data.to_batches()
for batch in new_data.to_batches():
rbs.append(batch)
self.data = pa.Table.from_batches(rbs)
self.size_cb(self.data.num_rows)
def __init__(
self, data_schema: pa.Schema, partitioning_column_schema: Optional[pa.Schema]
):
self.data_schema = data_schema
self.partitioning_column_schema = partitioning_column_schema
self.locations: Dict[TableLocationKey, self.TestTableLocation] = {}
self.location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None] = (
lambda *x: x
)
self.location_failure_cb: Callable[[str], None] = lambda *x: x
def add_table_location(
self,
table_location_key: TableLocationKeyImpl,
partitioning_column_values: Optional[pa.Table],
data_values: pa.Table,
):
"""
Add a new location to the table.
"""
if table_location_key in self.locations:
raise ValueError(
f"Cannot add table location {table_location_key} already exists"
)
new_location = self.TestTableLocation(
self.data_schema, partitioning_column_values
)
new_location.append_data(data_values)
self.locations[table_location_key] = new_location
def append_table_location(
self, table_location_key: TableLocationKeyImpl, data_values: pa.Table
):
"""
Append data to an existing location in the table.
"""
if table_location_key not in self.locations:
raise ValueError(
f"Cannot append to non-existent table location {table_location_key}"
)
self.locations[table_location_key].append_data(data_values)
class TestBackend(TableDataServiceBackend):
"""
A custom implementation of the TableDataServiceBackend for testing.
"""
def __init__(self):
self.tables: Dict[TableKey, TestTable] = {}
def add_table(self, table_key: TableKeyImpl, table: TestTable):
"""
Add a new table to the backend.
"""
if table_key in self.tables:
raise ValueError(f"{table_key} already exists")
self.tables[table_key] = table
def table_schema(
self,
table_key: TableKeyImpl,
schema_cb: Callable[[pa.Schema, Optional[pa.Schema]], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the schema of a table with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
schema_cb(table.data_schema, table.partitioning_column_schema)
def table_locations(
self,
table_key: TableKeyImpl,
location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the locations of a table with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
for key, location in self.tables[table_key].locations:
location_cb([key, location.partitioning_values])
success_cb()
def table_location_size(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
size_cb: Callable[[int], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the size of a table location with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return
size_cb(table.locations[table_location_key].data.num_rows)
def column_values(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
col: str,
offset: int,
min_rows: int,
max_rows: int,
values_cb: Callable[[pa.Table], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch column values with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return
location = table.locations[table_location_key]
values_cb(location.data.select([col]).slice(offset, min_rows))
def subscribe_to_table_locations(
self,
table_key: TableKeyImpl,
location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> Callable[[], None]:
"""
Subscribe to table locations with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return lambda *x: x
table = self.tables[table_key]
table.location_cb = location_cb
table.location_failure_cb = failure_cb
# send all existing locations straight away
for key, location in table.locations.items():
location_cb(key, location.partitioning_values)
success_cb()
def unsubscribe():
table.location_cb = lambda *x: x
table.location_failure_cb = lambda *x: x
return unsubscribe
def subscribe_to_table_location_size(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
size_cb: Callable[[int], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> Callable[[], None]:
"""
Subscribe to table location size with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return lambda *x: x
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return lambda *x: x
location = table.locations[table_location_key]
location.size_cb = size_cb
location.failure_cb = failure_cb
# send existing size
size_cb(location.data.num_rows)
success_cb()
def unsubscribe():
location.size_cb = lambda *x: x
location.failure_cb = lambda *x: x
return unsubscribe
Usage
The following code block demonstrates how to use the backend implementation to fetch a table in both static and refreshing contexts.
Usage of the example table data service backend
from deephaven.time import to_j_instant
import deephaven.arrow as dharrow
from deephaven import new_table
from deephaven.column import *
import numpy as np
# generate the same data for each location; noting that we do not need to include partitioning columns
location_cols = [
bool_col(name="Boolean", data=[True, False]),
byte_col(name="Byte", data=(1, -1)),
char_col(name="Char", data="-1"),
short_col(name="Short", data=[1, -1]),
int_col(name="Int", data=[1, -1]),
long_col(name="Long", data=[1, -1]),
long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)),
float_col(name="Float", data=[1.01, -1.01]),
double_col(name="Double", data=[1.01, -1.01]),
string_col(name="String", data=["foo", "bar"]),
datetime_col(
name="Datetime",
data=[
to_j_instant("2024-10-01T12:30:00 ET"),
to_j_instant("2024-10-01T12:45:00 ET"),
],
),
]
location_data = dharrow.to_arrow(new_table(cols=location_cols))
def generate_partitioning_values(ticker: str, exchange: str) -> pa.Table:
partitioning_cols = [
string_col(name="Ticker", data=[ticker]),
string_col(name="Exchange", data=[exchange]),
]
return dharrow.to_arrow(new_table(cols=partitioning_cols))
backend = TestBackend()
data_service = TableDataService(backend)
# generate a simple table
backend.add_table(
TableKeyImpl("sample"),
TestTable(
location_data.schema,
generate_partitioning_values("DUMMY_VAL", "DUMMY_VAL").schema,
),
)
def add_ticker_data(ticker: str, exchange: str):
table_key = TableKeyImpl("sample")
table_location_key = TableLocationKeyImpl(ticker + ":" + exchange)
if table_key not in backend.tables:
raise ValueError(f"{table_key} does not exist")
if table_location_key not in backend.tables[table_key].locations:
backend.tables[table_key].add_table_location(
table_location_key,
generate_partitioning_values(ticker, exchange),
location_data,
)
else:
backend.tables[table_key].append_table_location(
table_location_key, location_data
)
# add just a tiny amount of data
add_ticker_data("GOOG", "NYSE")
add_ticker_data("MSFT", "BZX")
add_ticker_data("MSFT", "BZY")
from deephaven.liveness_scope import LivenessScope
scope = LivenessScope()
with scope.open():
t = data_service.make_table(TableKeyImpl("sample"), refreshing=True)
- t
With the table open and visible in the IDE, more data can be appended with subsequent calls to add_ticker_data
.
# this adds a new table location to the already opened table
add_ticker_data("GOOG", "BZX")
# these append to existing table locations of the already opened table
add_ticker_data("MSFT", "BZX")
add_ticker_data("MSFT", "BZY")