Share tables between workers (Groovy)
Deephaven Core+ workers offer two different ways to share tables between them: URIs and Remote Tables. Avoiding data duplication across workers can greatly benefit high-volume and high-throughput applications by reducing memory usage and improving performance.
This guide covers how to share tables across workers in Deephaven using both methods. It describes their differences and the options available to control behavior when ingesting tables from remote workers. Everything is presented from the standpoint of the worker that will ingest one or more tables from a different worker, as no additional configuration is needed on the worker that is serving the tables other than creating them.
When connecting to a remote table, Deephaven gracefully handles the state of the upstream table so that it ticks in lockstep. Both the URI and remote table APIs provide reliable remote access to tables as well as graceful handling of disconnection and reconnection.
Note
For sharing tables between Legacy workers, see Preemptive Tables.
URIs
You can share data between Core+ workers through Uniform Resource Identifiers (URIs). A URI is a sequence of characters that identifies a particular resource on a network. In Deephaven, you can connect to remote tables served by a Persistent Query (PQ) using a URI. For instance, consider a PQ called SomePQ
with serial number 1234567890
. This PQ produces a table called someTable
. The URI for this table is either of:
pq://SomePQ/scope/someTable
pq://1234567890/scope/someTable
There is no benefit to using one URI scheme over the other - it is simply a matter of preference as to whether to use the serial number or name of the PQ.
Core+ workers can subscribe to both URI schemes using the ResolveTools
package. The workers must be running to fetch the results. The following code block pulls someTable
from SomePQ
using both the name and serial number of the PQ:
import io.deephaven.uri.ResolveTools
// "SomePQ" is the name of the Persistent Query, and "someTable" is the table name
remoteTableFromName = ResolveTools.resolve("pq://SomePQ/scope/someTable")
remoteTableFromSerial = ResolveTools.resolve("pq://1234567890/scope/someTable")
Optional URI parameters
URIs can include encoded parameters that control the behavior of the resultant table. These parameters come after the table name in the URI, separated by the ?
character. Multiple parameters are separated by the &
character:
pq://SomePQ/scope/someTable?parameter=value&otherParameter=value1,value2
All of the following examples connect to someTable
from SomePQ
.
To get a static snapshot of the upstream table, use the snapshot
option. The following example gets a static snapshot of the upstream table:
import io.deephaven.uri.ResolveTools
snapshotFromURI = ResolveTools.resolve("pq://SomePQ/scope/someTable?snapshot=true")
To get only a subset of columns in the upstream table, use the columns
option. The columns should be in a comma-separated list. The following example gets only Col1
, Col2
, and Col3
:
import io.deephaven.uri.ResolveTools
columnsFromURI = ResolveTools.resolve("pq://SomePQ/scope/someTable?columns=Col1,Col2,Col3")
You can control the behavior of the table when the upstream table disconnects for any reason with onDisconnect
. There are two different options for this:
clear
: This is the default option. It removes all rows on disconnect.retain
: This retains all rows at the time of disconnection.
The following example retains all rows in the event of a disconnection:
import io.deephaven.uri.ResolveTools
retainOnDisconnect = ResolveTools.resolve("pq://SomePQ/scope/someTable?onDisconnect=retain")
You can also control for how long Deephaven will try to reconnect to the upstream table after a disconnection with retryWindowMillis
. If this parameter is not specified, the default window is 5 minutes. The following example sets the retry window to 30 seconds (30,000 milliseconds):
import io.deephaven.uri.ResolveTools
retry30Seconds = ResolveTools.resolve("pq://SomePQ/scope/someTable?retryWindowMillis=30000")
You can tell Deephaven how many times it can retry connecting to the upstream table within the retryWindowMillis
window with maxRetries
. If not specified, the default value is unlimited. The following example retries up to 3 times within the default 5-minute window:
import io.deephaven.uri.ResolveTools
retry3Times = ResolveTools.resolve("pq://SomePQ/scope/someTable?maxRetries=3")
You can also tell Deephaven to connect to the upstream table as a blink table with blink
. The following example connects to the upstream table as a blink table:
import io.deephaven.uri.ResolveTools
someBlinkTable = ResolveTools.resolve("pq://SomePQ/scope/someTable?blink")
You can combine any number of these parameters as you'd like. The following example combines columns
, retryWindowMillis
, maxRetries
, and blink
:
import io.deephaven.uri.ResolveTools
someTableWithParams = ResolveTools.resolve("pq://SomePQ/scope/someTable?columns=Col1,Col2,Col3&retryWindowMillis=30000&maxRetries=3&blink")
If referencing a PQ by name, the name must be URL-encoded. This is important when the name has special characters such as spaces or slashes. Deephaven recommends using java.net.URLEncoder.encode
to escape values.
For example, consider a PQ named Sla\sh PQ
that produces someTable
. The URI for this table is:
import java.net.URLEncoder
pqName = URLEncoder.encode("Sla\sh PQ", "UTF-8")
pqUri = "pq://$pqName/scope/someTable"
println pqUri
pq://Sla%5Csh%20PQ/scope/some_table
URIs from a legacy worker
Legacy workers have their own Java Resolve Tools library to connect to a table served by a Persistent Query. Otherwise, the flow is the same.
import io.deephaven.enterprise.uri.ResolveTools
// "SomePQ" is the name of the Persistent Query, and "SomeTable" is the table name
remoteTable = ResolveTools.resolve("pq://SomePQ/scope/SomeTable")
Note
Legacy workers do not support additional URI-encoded parameters or automatic reconnections when subscribing to Core+ workers.
Remote tables
Deephaven offers a remote table API that allows you to connect to tables in PQs on Core+ workers from a Legacy or Core+ worker. Remote tables, like URIs, connect to tables in a PQ and track updates from a different, remote worker. The remote table API allows you to connect to upstream tables in both your local Deephaven cluster as well as remote ones.
Local cluster
You can use the function forLocalCluster
to create a RemoteTableBuilder
that can connect to a table in a PQ on the same Deephaven cluster as the worker that is running the code.
import io.deephaven.enterprise.remote.RemoteTableBuilder
// Create a remote table builder for the same cluster as where this code is running
builder = RemoteTableBuilder.forLocalCluster()
Remote cluster
You can use the function forRemoteCluster
to create a RemoteTableBuilder
that can connect to a table in a PQ on a different Deephaven cluster. You must provide the URL of the remote cluster to connect to.
// Create a remote table builder for a different Deephaven cluster, by providing the cluster's URL
// The default port is 8000 for Deephaven installations with Envoy, and 8123 for installations without Envoy
builder = RemoteTableBuilder.forRemoteCluster("https://dh-cluster:8000")
Remote table builder
Whether you are connecting to a remote table locally or remotely, the methods return a RemoteTableBuilder
object. This object has methods that control how the remote table behaves.
Subscribe
The subscribe
method in RemoteTableBuilder
connects to the remote PQ and creates a subscription to the indicated table name to keep the table updated locally as it may change.
// Specify the persistent query name and the table name, and then invoke the subscribe method to get
// the remote table created and start receiving updates for it.
remoteTable = builder.queryName("SomePQ").tableName("SomeTable").subscribe()
The remote PQ must be running at the moment the subscribe
method is invoked. If the optional table definition is not specified, the operation will fail with an error:
You can pass a table definition to the builder to avoid this error. In this case, the subscribe
operation will succeed even if the remote PQ is down when it is invoked. Data will not show up (the table will look empty). The remote table will then update if and when the remote PQ becomes available and provides data for it.
import io.deephaven.engine.table.TableDefinition
import io.deephaven.engine.table.ColumnDefinition
remoteTableDef = TableDefinition.of(ColumnDefinition.ofTime("Timestamp"))
remoteTable = RemoteTableBuilder.forLocalCluster()
.queryName("SomePQ")
.tableName("SomeTable")
.tableDefinition(remoteTableDef)
.subscribe()
While a locally defined remote table is updating normally, the remote PQ providing the data for its subscription may go down. In this case, the table will stop updating but will not fail. Instead, the table will look empty as if it has no rows. If the remote PQ comes back up and resumes providing updates, the locally defined remote table will resume updating with content according to the table in the remote PQ.
Note
The locally defined remote table will reflect the same data being published by the restarted instance of the remote PQ. A restarting PQ may not "resume" on the state before crashing for a table published by a previous instance of the PQ; the restarted PQ may instead reset that table to its beginning state. Understand the expected restarting behavior of the remote PQ to control your expectations for the contents of the local remote table.
You can optionally tell Deephaven to subscribe to a remote table as a blink table, in which only rows from the current update cycle are kept in memory:
remoteTable = RemoteTableBuilder.forLocalCluster()
.queryName("SomePQ")
.tableName("SomeTable")
.isBlink(true)
.subscribe()
Snapshot
Calling snapshot
on the builder will create a local, static snapshot of the table's content in the remote PQ at the time of the call.
snapshot = RemoteTableBuilder.forLocalCluster()
.queryName("SomePQ")
.tableName("SomeTable")
.snapshot()
Caution
If the remote PQ is not running at the time of snapshot
, the call will fail with an error. In this case, it makes no difference if a table definition was provided to the builder.
Once the snapshot data is obtained from the remote PQ, it will not change, and the local table will not be affected if the remote PQ goes down since the local table is static.
Subset of columns
Both subscribe
and snapshot
accept a SubscriptionOptions
argument, which limits the columns from the original table that will be present locally.
import io.deephaven.enterprise.remote.SubscriptionOptions
remoteTable = RemoteTableBuilder.forLocalCluster()
.queryName("SomePQ")
.tableName("SomeTable")
.subscribe(SubscriptionOptions.builder()
.addIncludedColumns("Timestamp")
.build())
Consult the API documentation for SubscriptionOptions
to see other options that can be set using this class.