Share tables between workers (Python)
Deephaven Core+ workers offer two different ways to share tables between them: URIs and Remote Tables. Avoiding data duplication across workers can greatly benefit high-volume and high-throughput applications by reducing memory usage and improving performance.
This guide covers how to share tables across workers in Deephaven using both methods. It describes their differences and the options available to control behavior when ingesting tables from remote workers. Everything is presented from the standpoint of the worker that will ingest one or more tables from a different worker, as no additional configuration is needed on the worker that is serving the tables other than creating them.
When connecting to a remote table, Deephaven gracefully handles the state of the upstream table so that it ticks in lockstep. Both the URI and remote table APIs provide reliable remote access to tables as well as graceful handling of disconnection and reconnection.
Note
For sharing tables between Legacy workers, see Preemptive Tables.
URIs
You can share data between Core+ workers through Uniform Resource Identifiers (URIs). A URI is a sequence of characters that identifies a particular resource on a network. In Deephaven, you can connect to remote tables served by a Persistent Query (PQ) using a URI. For instance, consider a PQ called SomePQ
with serial number 1234567890
. This PQ produces a table called some_table
. The URI for this table is either of:
pq://SomePQ/scope/some_table
pq://1234567890/scope/some_table
There is no benefit to using one URI scheme over the other - it is simply a matter of preference as to whether to use the serial number or name of the PQ.
Core+ workers can subscribe to both URI schemes using the uri
package. The workers must be running to fetch the results. The following code block pulls some_table
from SomePQ
using both the name and serial number URI schemes:
from deephaven import uri
# "SomePQ" is the name of the Persistent Query, and "SomeTable" is the table name
remote_table_from_name = uri.resolve("pq://SomePQ/scope/some_table")
remote_table_from_serial = uri.resolve("pq://1234567890/scope/some_table")
Optional URI parameters
URIs can include encoded parameters that control the behavior of the resultant table. These parameters come after the table name in the URI, separated by the ?
character. Multiple parameters are separated by the &
character:
pq://SomePQ/scope/some_table?parameter=value&otherParameter=value1,value2
All of the following examples connect to some_table
from SomePQ
.
To get a static snapshot of the upstream table, use the snapshot
option. The following example gets a static snapshot of the upstream table:
from deephaven import uri
snapshot_from_uri = uri.resolve("pq://SomePQ/scope/some_table?snapshot=true")
To get only a subset of columns in the upstream table, use the columns
option. The columns should be in a comma-separated list. The following example gets only Col1
, Col2
, and Col3
:
from deephaven import uri
some_table_cols_1_2_3 = uri.resolve(
"pq://SomePQ/scope/some_table?columns=Col1,Col2,Col3"
)
You can control the behavior of the table when the upstream table disconnects for any reason with onDisconnect
. There are two different options for this:
clear
: This is the default option. It removes all rows on disconnect.retain
: This retains all rows at the time of disconnection.
The following example retains all rows in the event of a disconnection:
from deephaven import uri
retain_on_disconnect = uri.resolve("pq://SomePQ/scope/some_table?onDisconnect=retain")
You can also control for how long Deephaven will try to reconnect to the upstream table after a disconnection with retryWindowMillis
. If this parameter is not specified, the default window is 5 minutes. The following example sets the retry window to 30 seconds (30,000 milliseconds):
from deephaven import uri
retry_30_seconds = uri.resolve("pq://SomePQ/scope/some_table?retryWindowMillis=30000")
You can tell Deephaven how many times it can retry connecting to the upstream table within the retryWindowMillis
window with maxRetries
. If not specified, the default value is unlimited. The following example retries up to 3 times within the default 5-minute window:
from deephaven import uri
retry_3_times = uri.resolve("pq://SomePQ/scope/some_table?maxRetries=3")
You can also tell Deephaven to connect to the upstream table as a blink table with blink
. The following example connects to the upstream table as a blink table:
from deephaven import uri
some_blink_table = uri.resolve("pq://SomePQ/scope/some_table?blink")
You can combine any number of these parameters as you'd like. The following example combines columns
, retryWindowMillis
, maxRetries
, and blink
:
from deephaven import uri
some_table_with_params = uri.resolve(
"pq://SomePQ/scope/some_table?columns=Col1,Col2,Col3&retryWindowMillis=30000&maxRetries=3&blink"
)
If referencing a PQ by name, the name must be URL-encoded. This is important when the name has special characters such as spaces or slashes. Deephaven recommends using urllib.parse.quote
to escape values.
For example, consider a PQ named Sla\sh PQ
that produces some_table
. The URI for this table is:
from urllib.parse import quote
pq_name = quote("Sla\sh PQ")
pq_uri = f"pq://{pq_name}/scope/some_table"
print(pq_uri)
pq://Sla%5Csh%20PQ/scope/some_table
Note
Legacy workers do not support additional URI-encoded parameters or automatic reconnections when subscribing to Core+ workers.
Remote Tables
Deephaven offers a remote table API that allows you to connect to tables in PQs on Core+ workers from a Legacy or Core+ worker. Remote tables, like URIs, connect to tables in a PQ and track updates from a different, remote worker. The remote table API allows you to connect to upstream tables in both your local Deephaven cluster as well as remote ones.
Local cluster
You can use the function in_local_cluster
to create a builder object of type RemoteTableBuilder
that can connect to a table in a PQ on the same Deephaven cluster as the worker that is running the code.
from deephaven_enterprise import remote_table
# Create a remote table builder for the same cluster as where this code is running
builder = remote_table.in_local_cluster(query_name="SomePQ", table_name="SomeTable")
Remote cluster
If the PQ is running in a different cluster, use the for_remote_cluster
function to construct the RemoteTableBuilder
.
Connecting to a table in a remote cluster instead uses a builder pattern. The for_remote_cluster
method only takes the URL of the remote cluster as a parameter. Parameters like query and table name are set via functions, which return the same RemoteTableBuilder
with the additional parameters.
# Create a remote table builder for a different Deephaven cluster, by providing the cluster's URL
# The default port is 8000 for Deephaven installations with Envoy, and 8123 for installations without Envoy
builder = (
remote_table.for_remote_cluster("https://dh-cluster:8000")
.query_name("SomePQ")
.table_name("SomeTable")
)
Remote table builder
Whether you are connecting to a remote table locally or remotely, the methods return a RemoteTableBuilder
object. This object has methods that control how the remote table behaves.
Subscribe
The subscribe
method in RemoteTableBuilder
connects to the remote PQ and creates a subscription to the indicated table name to keep the table updated locally as it may change.
rtable = builder.subscribe()
The remote PQ must be running at the moment the subscribe
method is invoked. If the optional table definition is not specified, the operation will fail with an error:
You can pass a table definition to the builder to avoid this error. In this case, the subscribe
operation will succeed even if the remote PQ is down when it is invoked. Data will not show up (the table will look empty). The remote table will then update if and when the remote PQ becomes available and provides data for it.
from deephaven.table import TableDefinition
from deephaven import dtypes
table_def = TableDefinition({"Timestamp": dtypes.Instant})
rtable = (
remote_table.in_local_cluster(query_name="SomePQ", table_name="SomeTable")
.table_definition(table_def.j_object)
.subscribe()
)
While a locally defined remote table is updating normally, the remote PQ providing the data for its subscription may go down. In this case, the table will stop updating but will not fail. Instead, the table will look empty as if it has no rows. If the remote PQ comes back up and resumes providing updates, the locally defined remote table will resume updating with content according to the table in the remote PQ.
Note
The locally defined remote table will reflect the same data being published by the restarted instance of the remote PQ. A restarting PQ may not "resume" on the state before crashing for a table published by a previous instance of the PQ; the restarted PQ may instead reset that table to its beginning state. Understand the expected restarting behavior of the remote PQ to control your expectations for the contents of the local remote table.
You can optionally tell Deephaven to subscribe to a remote table as a blink table, in which only rows from the current update cycle are kept in memory:
rtable = (
remote_table.in_local_cluster(query_name="SomePQ", table_name="SomeTable")
.blink(True)
.subscribe()
)
Snapshot
Calling snapshot
on the builder will create a local, static snapshot of the table's content in the remote PQ at the time of the call.
snapshot = remote_table.in_local_cluster(
query_name="SomePQ", table_name="SomeTable"
).snapshot()
Caution
If the remote PQ is not running at the time of snapshot
, the call will fail with an error. In this case, it makes no difference if a table definition was provided to the builder.
Once the snapshot data is obtained from the remote PQ, it will not change, and the local table will not be affected if the remote PQ goes down since the local table is static.
Subset of columns
Both subscribe
and snapshot
accept a named argument, included_columns
, which limits the columns from the original table that will be present locally:
rtable = remote_table.in_local_cluster(
query_name="SomePQ", table_name="SomeTable"
).subscribe(included_columns=["Timestamp"])
Consult the API documentation for subscribe
and snapshot
to see other options that can be set with other named arguments for those calls.