Deephaven Community Core version 0.37.0 is now available. It brings a slew of new features, improvements, bug fixes, and breaking changes. Let's dive into the details.
New features
A built-in Python linter
Deephaven now has the ruff Python linter built into the IDE. The integration makes writing Python queries in Deephaven more convenient. You'll be able to write better and more readable queries faster with things like:
- Auto-formatting:
- Instant recognition of unused imports:
- Instant recognition of errors:
- Customizable settings, including an option to format a file every time it's saved:
Check out the video below, which shows a demo of the new Python linter at 13:55.
Support for Arrow Flight SQL
Deephaven now supports Arrow Flight SQL, a protocol for interacting with SQL databases using the Arrow in-memory format and the Flight RPC framework. You can use it to query SQL databases and other data sources that support Arrow Flight SQL.
Python TableDataService API
The new Python TableDataService
API simplifies how the engine consumes data served by a remote service or database. It enables users to provide external data to Deephaven in the form of PyArrow tables, which can back Deephaven tables in both static and refreshing contexts.
See the expandable code blocks below for an example implementation.
A Table Data Service backend implementation
from deephaven.experimental.table_data_service import (
TableDataServiceBackend,
TableKey,
TableLocationKey,
TableDataService,
)
from typing import Callable, Optional, Dict
import pyarrow as pa
class TableKeyImpl(TableKey):
"""
A simple implementation of a TableKey.
"""
def __init__(self, key: str):
self.key = key
def __hash__(self):
return hash(self.key)
def __eq__(self, other):
if not isinstance(other, TableKeyImpl):
return NotImplemented
return self.key == other.key
def __str__(self):
return f"TableKeyImpl{{{self.key}}}"
class TableLocationKeyImpl(TableLocationKey):
"""
A simple implementation of a TableLocationKey.
"""
def __init__(self, key: str):
self.key = key
def __hash__(self):
return hash(self.key)
def __eq__(self, other):
if not isinstance(other, TableLocationKeyImpl):
return NotImplemented
return self.key == other.key
def __str__(self):
return f"TableLocationKeyImpl{{{self.key}}}"
class TestTable:
"""
A custom table implementation for the backend.
"""
class TestTableLocation:
"""
A custom table location implementation for the backend.
"""
def __init__(
self, data_schema: pa.Schema, partitioning_values: Optional[pa.Table]
):
self.partitioning_values = partitioning_values
self.size_cb: Callable[[int], None] = lambda *x: x
self.size_failure_cb: Callable[[], None] = lambda *x: x
self.data: pa.Table = data_schema.empty_table()
def append_data(self, new_data: pa.Table):
"""
Append data to the table in batches.
"""
rbs = self.data.to_batches()
for batch in new_data.to_batches():
rbs.append(batch)
self.data = pa.Table.from_batches(rbs)
self.size_cb(self.data.num_rows)
def __init__(
self, data_schema: pa.Schema, partitioning_column_schema: Optional[pa.Schema]
):
self.data_schema = data_schema
self.partitioning_column_schema = partitioning_column_schema
self.locations: Dict[TableLocationKey, self.TestTableLocation] = {}
self.location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None] = (
lambda *x: x
)
self.location_failure_cb: Callable[[str], None] = lambda *x: x
def add_table_location(
self,
table_location_key: TableLocationKeyImpl,
partitioning_column_values: Optional[pa.Table],
data_values: pa.Table,
):
"""
Add a new location to the table.
"""
if table_location_key in self.locations:
raise ValueError(
f"Cannot add table location {table_location_key} already exists"
)
new_location = self.TestTableLocation(
self.data_schema, partitioning_column_values
)
new_location.append_data(data_values)
self.locations[table_location_key] = new_location
def append_table_location(
self, table_location_key: TableLocationKeyImpl, data_values: pa.Table
):
"""
Append data to an existing location in the table.
"""
if table_location_key not in self.locations:
raise ValueError(
f"Cannot append to non-existent table location {table_location_key}"
)
self.locations[table_location_key].append_data(data_values)
class TestBackend(TableDataServiceBackend):
"""
A custom implementation of the TableDataServiceBackend for testing.
"""
def __init__(self):
self.tables: Dict[TableKey, TestTable] = {}
def add_table(self, table_key: TableKeyImpl, table: TestTable):
"""
Add a new table to the backend.
"""
if table_key in self.tables:
raise ValueError(f"{table_key} already exists")
self.tables[table_key] = table
def table_schema(
self,
table_key: TableKeyImpl,
schema_cb: Callable[[pa.Schema, Optional[pa.Schema]], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the schema of a table with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
schema_cb(table.data_schema, table.partitioning_column_schema)
def table_locations(
self,
table_key: TableKeyImpl,
location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the locations of a table with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
for key, location in self.tables[table_key].locations:
location_cb([key, location.partitioning_values])
success_cb()
def table_location_size(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
size_cb: Callable[[int], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch the size of a table location with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return
size_cb(table.locations[table_location_key].data.num_rows)
def column_values(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
col: str,
offset: int,
min_rows: int,
max_rows: int,
values_cb: Callable[[pa.Table], None],
failure_cb: Callable[[str], None],
) -> None:
"""
Fetch column values with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return
location = table.locations[table_location_key]
values_cb(location.data.select([col]).slice(offset, min_rows))
def subscribe_to_table_locations(
self,
table_key: TableKeyImpl,
location_cb: Callable[[TableLocationKeyImpl, Optional[pa.Table]], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> Callable[[], None]:
"""
Subscribe to table locations with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return lambda *x: x
table = self.tables[table_key]
table.location_cb = location_cb
table.location_failure_cb = failure_cb
# send all existing locations straight away
for key, location in table.locations.items():
location_cb(key, location.partitioning_values)
success_cb()
def unsubscribe():
table.location_cb = lambda *x: x
table.location_failure_cb = lambda *x: x
return unsubscribe
def subscribe_to_table_location_size(
self,
table_key: TableKeyImpl,
table_location_key: TableLocationKeyImpl,
size_cb: Callable[[int], None],
success_cb: Callable[[], None],
failure_cb: Callable[[str], None],
) -> Callable[[], None]:
"""
Subscribe to table location size with a callable.
"""
if table_key not in self.tables:
failure_cb(f"{table_key} does not exist")
return lambda *x: x
table = self.tables[table_key]
if table_location_key not in table.locations:
failure_cb(f"{table_location_key} does not exist in table_key {table_key}")
return lambda *x: x
location = table.locations[table_location_key]
location.size_cb = size_cb
location.failure_cb = failure_cb
# send existing size
size_cb(location.data.num_rows)
success_cb()
def unsubscribe():
location.size_cb = lambda *x: x
location.failure_cb = lambda *x: x
return unsubscribe
Usage of the example Table Data Service backend
from deephaven.time import to_j_instant
import deephaven.arrow as dharrow
from deephaven import new_table
from deephaven.column import *
import numpy as np
# generate the same data for each location; noting that we do not need to include partitioning columns
location_cols = [
bool_col(name="Boolean", data=[True, False]),
byte_col(name="Byte", data=(1, -1)),
char_col(name="Char", data="-1"),
short_col(name="Short", data=[1, -1]),
int_col(name="Int", data=[1, -1]),
long_col(name="Long", data=[1, -1]),
long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)),
float_col(name="Float", data=[1.01, -1.01]),
double_col(name="Double", data=[1.01, -1.01]),
string_col(name="String", data=["foo", "bar"]),
datetime_col(
name="Datetime",
data=[
to_j_instant("2024-10-01T12:30:00 ET"),
to_j_instant("2024-10-01T12:45:00 ET"),
],
),
]
location_data = dharrow.to_arrow(new_table(cols=location_cols))
def generate_partitioning_values(ticker: str, exchange: str) -> pa.Table:
partitioning_cols = [
string_col(name="Ticker", data=[ticker]),
string_col(name="Exchange", data=[exchange]),
]
return dharrow.to_arrow(new_table(cols=partitioning_cols))
backend = TestBackend()
data_service = TableDataService(backend)
# generate a simple table
backend.add_table(
TableKeyImpl("sample"),
TestTable(
location_data.schema,
generate_partitioning_values("DUMMY_VAL", "DUMMY_VAL").schema,
),
)
def add_ticker_data(ticker: str, exchange: str):
table_key = TableKeyImpl("sample")
table_location_key = TableLocationKeyImpl(ticker + ":" + exchange)
if table_key not in backend.tables:
raise ValueError(f"{table_key} does not exist")
if table_location_key not in backend.tables[table_key].locations:
backend.tables[table_key].add_table_location(
table_location_key,
generate_partitioning_values(ticker, exchange),
location_data,
)
else:
backend.tables[table_key].append_table_location(
table_location_key, location_data
)
# add just a tiny amount of data
add_ticker_data("GOOG", "NYSE")
add_ticker_data("MSFT", "BZX")
add_ticker_data("MSFT", "BZY")
from deephaven.liveness_scope import LivenessScope
scope = LivenessScope()
with scope.open():
t = data_service.make_table(TableKeyImpl("sample"), refreshing=True)
- t
Adding data to the table
# this adds a new table location to the already opened table
add_ticker_data("GOOG", "BZX")
# these append to existing table locations of the already opened table
add_ticker_data("MSFT", "BZX")
add_ticker_data("MSFT", "BZY")
Read and write Parquet to and from GCS
You can now read and write Parquet files to and from Google Cloud Storage (GCS) with Deephaven. Try it for yourself:
from deephaven import parquet
from deephaven.experimental import s3
us_states = parquet.read(
"gs://cloud-samples-data/bigquery/us-states/us-states.parquet",
special_instructions=s3.S3Instructions(
anonymous_access=True,
),
)
Additional new features
- A new function,
add_only_to_blink
, converts an add-only table to a blink table. - The
slice
table operation is now available in the Python client API. - Deephaven now permits recomputing a formula whenever a row is modified without considering input columns.
- Multi-column support for both
rolling_formula
andagg.formula
. - The ability to specify AWS / S3 credentials in a way that allows Deephaven to own the client-building logic when building Iceberg catalog adapters.
- Support for all
java.lang.Math
operations in the query language. - The ability to specify an AWS profile through S3 instructions.
- Multi-join support in the Python client.
- The ability to infer the correct column types when converting a Pandas DataFrame to a Deephaven table.
- A static getter method to see the resultant table definition of a read operation in the server-side Groovy API.
Improvements
Major updates to our Iceberg integration
The 0.35 release introduced Deephaven's Iceberg integration. These improvements include fixes, breaking changes, and new features. We cover all the related changes in this section. The 0.37 release greatly improves on our Iceberg API by adding support for:
- Refreshing Iceberg tables
- More types of Iceberg catalogs
- Writing to Iceberg
For starters, let's cover some of the breaking changes in the new API:
IcebergInstructions
is nowIcebergReadInstructions
.- The
snapshots
method is no longer a part of theIcebergCatalogAdapter
class. Instead, it's part of the newIcebergTableAdapter
class.
The revamped API also includes some new features:
- Classes
IcebergTableAdapter
: An interface for interacting with Iceberg tables. Enables fetching snapshots, table definitions, and Iceberg tables.IcebergUpdateMode
: Supports three different update modes: static, manually refreshing, and auto-refreshing Iceberg tables.IcebergTable
: An extension of Deephaven table with anupdate
method for manually refreshing Iceberg tables.
- Methods
adapter
: A generic Iceberg catalog adapter that supports various Iceberg catalog types through a dict of properties.
Here's a list of catalog types supported by Deephaven. Previously, only the first two were supported:
- Rest
- AWS Glue
- JDBC
- Nessie
- Hive
- Hadoop
In addition, our S3 API has been revamped and improved. Read more about the revamped Iceberg API in our Iceberg user guide.
JavaScript API
The JavaScript (JS) API was majorly overhauled in this release. Some of the improvements include:
- The client can now connect over a WebSocket instead of deriving the connection info by inspecting the window location.
- Various improvements for third-party consumers, including:
- Zero browser dependencies
- Multiple module types, namely ES and CommonJS
Client APIs
- The Python client API now supports PyArrow date64 and time64 data types.
- A new general way to bind objects to Python client sessions has been added.
Additional improvements
- You can now provide a custom table definition when reading Parquet data with metadata files.
- Asynchronous add and delete methods for input tables.
- Hovering over a column in the UI now shows the count of each unique value in the column.
- Deephaven now has API-level Protobuf documentation.
- Two new configuration options when starting a server have been added:
http2.maxConcurrentStreams
: The number of streams to allow for a maximum given http2 connection.http2.maxHeaderRequestSize
: The maximum number of bytes to allow in HTTP request headers.
- The Barrage viewport protocol has been simplified, making table updates smoother.
- The Python and Groovy versions used by a Deephaven server are now exposed to client APIs.
Breaking changes
General
- Deephaven Community Core's protocol buffer definitions have moved to a new folder.
Server-side APIs
- A
MultiJoinTable
(the output of amulti_join
) changedtable
from a method to an attribute to get the underlying table. - Business calendar time ranges are now exclusive at the close point. They were previously inclusive.
SeekableChannelsProviderLoader.fromServiceLoader
has been removed and replaced withSeekableChannelsProviderLoader.load
.DataInstructionsProviderLoader.fromServiceLoader
has been removed and replaced withDataInstructionsProviderLoader.load
.IcebertTableParquetLocationKey
has a new parameter:SeekableChannelsProvider
.
Client-side APIs
Python client
- NumPy is now a required dependency for the Python client API.
Bug fixes
Server-side APIs
General
- Fixed an issue where rolling up a blink table could cause some rows to improperly update their values.
- Expanding rows in hierarchical tables now works as expected when applied to rows past the first page of data.
- Static aggregations remap names correctly when a data index uses a different name for key column(s).
- Plotting a partitioned table transform done with
one_click
now works as expected. - A bug that broke rollup table snapshots with viewports of more than 512 rows has been fixed.
- Deephaven's Parquet integration now properly handles both empty Parquet files and Parquet files with empty row groups.
- Number formatting in tables now works as expected.
- Fixed an issue that incorrectly reported null values in quick filters from the UI.
- A bug has been fixed that could cause a natural join operation to be slow when the right table is static.
- Fixed a time range off-by-one error in the time API.
Python
time_window
now works when running Deephaven from Python and Jupyter.- Partial user-defined functions (UDFs) now work as expected when called in table operations.
- Type inferencing now works on UDFs in table operations that call C++ wrapped with pybind11.
- deephaven.ui now supports columns of type
java.time.LocalTime
. - The embedded Python server (pip-installed Deephaven) now shuts down JVM threads when it's able to, thus reducing the chances of errors and crashes.
Groovy
- Fixed an issue where
wouldMatch
could be memoized incorrectly, leading to incorrect results.
User interface
Client APIs
JS client API
- The JS API was found to reference two undefined tickets, which could cause errors. This has been fixed.
- Rollup tables created through the JS API (or directly) no longer error when there is a null sentinel.
- Fixed a JS TypeError that occurs when the server shuts down, which prevented shutdown events from being sent to listeners.
- The JS API now correctly handles redirected modifies.
- Fixed an issue that could cause ticking charts to not update when the data changes.
- Fixed an issue where an error could be incorrectly thrown when adding groups to a rollup table.
C++ client API
- Fixed an issue that caused the build to fail on Windows systems.
- Fixed an issue in some error messages that didn't separate line numbers and error codes.
- The API now correctly processes NULL conventions for types with an underlying
int64
representation.
Reach out
Our Slack community continues to grow! Join us there for updates and help with your queries.