Share tables between workers (Python)

Deephaven Core+ workers offer two different ways to share tables between them: URIs and Remote Tables. Avoiding data duplication across workers can greatly benefit high-volume and high-throughput applications by reducing memory usage and improving performance.

This guide covers how to share tables across workers in Deephaven using both methods. It describes their differences and the options available to control behavior when ingesting tables from remote workers. Everything is presented from the standpoint of the worker that will ingest one or more tables from a different worker, as no additional configuration is needed on the worker that is serving the tables other than creating them.

When connecting to a remote table, Deephaven gracefully handles the state of the upstream table so that it ticks in lockstep. Both the URI and remote table APIs provide reliable remote access to tables as well as graceful handling of disconnection and reconnection.

Note

For sharing tables between Legacy workers, see Preemptive Tables.

URIs

You can share data between Core+ workers through Uniform Resource Identifiers (URIs). A URI is a sequence of characters that identifies a particular resource on a network. In Deephaven, you can connect to remote tables served by a Persistent Query (PQ) using a URI. For instance, consider a PQ called SomePQ with serial number 1234567890. This PQ produces a table called some_table. The URI for this table is either of:

  • pq://SomePQ/scope/some_table
  • pq://1234567890/scope/some_table

There is no benefit to using one URI scheme over the other - it is simply a matter of preference as to whether to use the serial number or name of the PQ.

Core+ workers can subscribe to both URI schemes using the uri package. The workers must be running to fetch the results. The following code block pulls some_table from SomePQ using both the name and serial number URI schemes:

from deephaven import uri

# "SomePQ" is the name of the Persistent Query, and "SomeTable" is the table name
remote_table_from_name = uri.resolve("pq://SomePQ/scope/some_table")
remote_table_from_serial = uri.resolve("pq://1234567890/scope/some_table")

Optional URI parameters

URIs can include encoded parameters that control the behavior of the resultant table. These parameters come after the table name in the URI, separated by the ? character. Multiple parameters are separated by the & character:

pq://SomePQ/scope/some_table?parameter=value&otherParameter=value1,value2

All of the following examples connect to some_table from SomePQ.

To get a static snapshot of the upstream table, use the snapshot option. The following example gets a static snapshot of the upstream table:

from deephaven import uri

snapshot_from_uri = uri.resolve("pq://SomePQ/scope/some_table?snapshot=true")

To get only a subset of columns in the upstream table, use the columns option. The columns should be in a comma-separated list. The following example gets only Col1, Col2, and Col3:

from deephaven import uri

some_table_cols_1_2_3 = uri.resolve(
    "pq://SomePQ/scope/some_table?columns=Col1,Col2,Col3"
)

You can control the behavior of the table when the upstream table disconnects for any reason with onDisconnect. There are two different options for this:

  • clear: This is the default option. It removes all rows on disconnect.
  • retain: This retains all rows at the time of disconnection.

The following example retains all rows in the event of a disconnection:

from deephaven import uri

retain_on_disconnect = uri.resolve("pq://SomePQ/scope/some_table?onDisconnect=retain")

You can also control for how long Deephaven will try to reconnect to the upstream table after a disconnection with retryWindowMillis. If this parameter is not specified, the default window is 5 minutes. The following example sets the retry window to 30 seconds (30,000 milliseconds):

from deephaven import uri

retry_30_seconds = uri.resolve("pq://SomePQ/scope/some_table?retryWindowMillis=30000")

You can tell Deephaven how many times it can retry connecting to the upstream table within the retryWindowMillis window with maxRetries. If not specified, the default value is unlimited. The following example retries up to 3 times within the default 5-minute window:

from deephaven import uri

retry_3_times = uri.resolve("pq://SomePQ/scope/some_table?maxRetries=3")

You can also tell Deephaven to connect to the upstream table as a blink table with blink. The following example connects to the upstream table as a blink table:

from deephaven import uri

some_blink_table = uri.resolve("pq://SomePQ/scope/some_table?blink")

You can combine any number of these parameters as you'd like. The following example combines columns, retryWindowMillis, maxRetries, and blink:

from deephaven import uri

some_table_with_params = uri.resolve(
    "pq://SomePQ/scope/some_table?columns=Col1,Col2,Col3&retryWindowMillis=30000&maxRetries=3&blink"
)

If referencing a PQ by name, the name must be URL-encoded. This is important when the name has special characters such as spaces or slashes. Deephaven recommends using urllib.parse.quote to escape values.

For example, consider a PQ named Sla\sh PQ that produces some_table. The URI for this table is:

from urllib.parse import quote

pq_name = quote("Sla\sh PQ")
pq_uri = f"pq://{pq_name}/scope/some_table"

print(pq_uri)
pq://Sla%5Csh%20PQ/scope/some_table

Note

Legacy workers do not support additional URI-encoded parameters or automatic reconnections when subscribing to Core+ workers.

Remote Tables

Deephaven offers a remote table API that allows you to connect to tables in PQs on Core+ workers from a Legacy or Core+ worker. Remote tables, like URIs, connect to tables in a PQ and track updates from a different, remote worker. The remote table API allows you to connect to upstream tables in both your local Deephaven cluster as well as remote ones.

Local cluster

You can use the function in_local_cluster to create a builder object of type RemoteTableBuilder that can connect to a table in a PQ on the same Deephaven cluster as the worker that is running the code.

from deephaven_enterprise import remote_table

# Create a remote table builder for the same cluster as where this code is running
builder = remote_table.in_local_cluster(query_name="SomePQ", table_name="SomeTable")

Remote cluster

If the PQ is running in a different cluster, use the for_remote_cluster function to construct the RemoteTableBuilder.

Connecting to a table in a remote cluster instead uses a builder pattern. The for_remote_cluster method only takes the URL of the remote cluster as a parameter. Parameters like query and table name are set via functions, which return the same RemoteTableBuilder with the additional parameters.

# Create a remote table builder for a different Deephaven cluster, by providing the cluster's URL
# The default port is 8000 for Deephaven installations with Envoy, and 8123 for installations without Envoy
builder = (
    remote_table.for_remote_cluster("https://dh-cluster:8000")
    .query_name("SomePQ")
    .table_name("SomeTable")
)

Remote table builder

Whether you are connecting to a remote table locally or remotely, the methods return a RemoteTableBuilder object. This object has methods that control how the remote table behaves.

Subscribe

The subscribe method in RemoteTableBuilder connects to the remote PQ and creates a subscription to the indicated table name to keep the table updated locally as it may change.

rtable = builder.subscribe()

The remote PQ must be running at the moment the subscribe method is invoked. If the optional table definition is not specified, the operation will fail with an error:

img

You can pass a table definition to the builder to avoid this error. In this case, the subscribe operation will succeed even if the remote PQ is down when it is invoked. Data will not show up (the table will look empty). The remote table will then update if and when the remote PQ becomes available and provides data for it.

from deephaven.table import TableDefinition
from deephaven import dtypes

table_def = TableDefinition({"Timestamp": dtypes.Instant})
rtable = (
    remote_table.in_local_cluster(query_name="SomePQ", table_name="SomeTable")
    .table_definition(table_def.j_object)
    .subscribe()
)

While a locally defined remote table is updating normally, the remote PQ providing the data for its subscription may go down. In this case, the table will stop updating but will not fail. Instead, the table will look empty as if it has no rows. If the remote PQ comes back up and resumes providing updates, the locally defined remote table will resume updating with content according to the table in the remote PQ.

Note

The locally defined remote table will reflect the same data being published by the restarted instance of the remote PQ. A restarting PQ may not "resume" on the state before crashing for a table published by a previous instance of the PQ; the restarted PQ may instead reset that table to its beginning state. Understand the expected restarting behavior of the remote PQ to control your expectations for the contents of the local remote table.

You can optionally tell Deephaven to subscribe to a remote table as a blink table, in which only rows from the current update cycle are kept in memory:

rtable = (
    remote_table.in_local_cluster(query_name="SomePQ", table_name="SomeTable")
    .blink(True)
    .subscribe()
)

Snapshot

Calling snapshot on the builder will create a local, static snapshot of the table's content in the remote PQ at the time of the call.

snapshot = remote_table.in_local_cluster(
    query_name="SomePQ", table_name="SomeTable"
).snapshot()

Caution

If the remote PQ is not running at the time of snapshot, the call will fail with an error. In this case, it makes no difference if a table definition was provided to the builder.

Once the snapshot data is obtained from the remote PQ, it will not change, and the local table will not be affected if the remote PQ goes down since the local table is static.

Subset of columns

Both subscribe and snapshot accept a named argument, included_columns, which limits the columns from the original table that will be present locally:

rtable = remote_table.in_local_cluster(
    query_name="SomePQ", table_name="SomeTable"
).subscribe(included_columns=["Timestamp"])

Consult the API documentation for subscribe and snapshot to see other options that can be set with other named arguments for those calls.