Subscribe to other queries' tables
Overview
There comes a time in every query developer’s career when it becomes inconvenient or impossible to compute every table of interest directly within a single Deephaven query. Luckily, we’ve prepared for this scenario by offering a rich feature set to enable multi-stage data pipelines. The most important building block for this capability is the ability to subscribe to tables generated by other queries. If you already know what you’re trying to do, you can probably skip straight to the point.
What’s my motivation here?
There are lots of reasons you might find it necessary to architect your data-driven application to use multiple processes (e.g., Deephaven query workers), but here are a few scenarios to get you thinking:
- Resource Sharing: You’ve decided each of your N (presumably in the range [1, 7.82x109]) internal customers should get to have their own instance of a particular query in order to avoid any kind of resource contention, or to allow for customizations, or just because it makes you feel powerful. Unfortunately, one of the components of this query is just really expensive to compute, and your boss is really stingy about giving you N times the number of resources they think you need, so you only want to compute it once (or maybe once for every handful of users, depending on just how much data is being published and how expensive each client is - compute, memory, and network-wise).
- Isolation for Privacy and Security: You and your programming partner, known as the dynamic duo Alice and Bob, are writing queries for that untrustworthy nogoodnik Eve. You’d like to show your adversary (or customer… you say potato, I say solanum tuberosum) some of your output, but not necessarily all of it, and certainly not your code.
- Resource Expansion: Your computations require multiple processes because they demand more resources than you can comfortably fit on a single host, or you’ve segregated access to some of your inputs to certain hosts. Congratulations, you’re probably trying to solve really interesting problems, or your administrators have created some for you!
So, what are my options?
Given the flexibility and modularity delivered by Deephaven’s query engine and API, there are many! That said, there are a few available “out of the box”:
- Using the Deephaven Java API, you can subscribe to tables published by another query with a specified update frequency and interact locally with consistent snapshots of those tables from within the Deephaven query engine. This functionality is used by Deephaven query workers and Java clients including the Swing GUI. This option is the primary focus for this document, because it makes it easy to architect multi-process solutions using the Deephaven query engine.
- Using the Deephaven Open API, you can subscribe to tables published by another query with a specific update frequency, and interact locally with the data you retrieve in whatever manner you choose.
- Using Deephaven’s SBE integration, you can read table data remotely over an industry standard protocol.
Just to preview our plans, we’re developing a more flexible data backplane protocol that should allow you to subscribe to and publish real-time data from any language without core Deephaven libraries, further opening up such multi-process pipelines to include other data tools in a first-class manner.
Using the Deephaven Java API
A (Sort-of) Brief Explanation
Deephaven query workers are Java processes hosting an instance of the Deephaven query engine, which exposes a server that allows authenticated clients to connect and run commands to programmatically interact with data. Any client with the necessary endpoint information (read: TCP/IP address and port) can connect, authenticate, and run commands if so permissioned.
The client that requested a worker’s creation from a query dispatcher is handed this information as part of the response to its request. This client is the “primary client” and expected to be the first to connect to the worker. It can share the worker’s connection details directly or indirectly with other clients, allowing for “secondary client” connections to its worker.
The most common example of this within the Deephaven system is the Persistent Query Controller. When launching a PQ, it acts as a primary client, requesting a worker and running the PQ’s initialization job inside that worker. It then acts as a discovery service, making worker details available to its own clients in turn.
Any process using the Deephaven Java API can act as a client to the Persistent Query Controller’s discovery service, and/or to zero or more workers. It’s important to remember that a Deephaven query worker is just such a process, meaning that workers can be clients of the controller and of other workers, and in turn have other workers as clients.
If you’re not working from within a Deephaven query worker, you may need to know how to connect to Deephaven from another Java program.
Some Code
These samples focus on the most common and generally useful approach. They show and use an example Persistent Query running with owner “PqOwner”, name “PqName”, and an exported Table named “pqTable”.
In all cases, we are using a Deephaven technology called “Preemptive Updates Tables”, which work by sending an initial snapshot of data and index information, followed by periodic deltas containing new or changed data and index information. Deltas coalesce all changes that occur since the last snapshot or delta was sent. The duration of the interval between updates is a property of the PreemptiveUpdatesTable; multiple PreemptiveUpdatesTables may be created from the same parent table in order to export data at different update frequencies.
Deephaven’s query engine will ensure that deltas are applied in an atomically-visible way for client-side Java code that follows the usual rules of engagement with the LiveTableMonitor
.
Publishing a Table
See below for the script body of our trivial example Persistent Query, PqOwner/PqName. Note that you might additionally want to apply permissions to these tables.
Note
See also: Persistent Query ACLs
// Create a Table and give it a name
pqTable = db.timeTable("00:00:03")
// Create and publish a preemptive version of pqTable which updates
// every 5 seconds
pqTablePreemptive = result.preemptiveUpdatesTable(5000)
from deephaven import *
# Create a Table and give it a name
pqTable = db.timeTable("00:00:03")
# Create and publish a preemptive version of pqTable which updates
# every 5 seconds
pqTablePreemptive = result.preemptiveUpdatesTable(5000)
Subscribing to a Table
Now that we have a Persistent Query, PqOwner/PqName exporting the table we’re interested in, we need to subscribe to it. See below for example code.
Note that for the purposes of this example, we’ve assumed that the PQ has serial number 1502904477776000000
. Serial numbers are immutable, unique identifiers assigned to each PQ, and can be found on your console’s Query Config panel or obtained programmatically using a PersistentQueryControllerClient
. Using a serial number instead of owner/name pair is resilient against query ownership changes and renames, but also more brittle in cases where PQs are copied across environments without preserving serial numbers. Which approach is best will depend on your administrative policies.
import com.illumon.iris.controller.utils.PersistentQueryTableHelper
// Connect to PqOwner's PqName query
client = PersistentQueryTableHelper.getClientForPersistentQuery(log, "PqOwner", "PqName", 3*60*1000)
// Get the "pqTable" table from PqOwner's PqName query
pqTableLocal = PersistentQueryTableHelper.getPreemptiveTableFromPersistentQuery(client, "pqTablePreemptive")
// use the table to perform calculations
derived = pqTableLocal.update(“LocalChangeTime=currentTime()”)
// Alternately, connect to the PqName query by using its serial number
client = PersistentQueryTableHelper.getClientForPersistentQuery(log, 1502904477776000000, 3*60*1000)
from deephaven import *
# Connect to PqOwner's PqName query with a 3 minute timeout
client = PersistentQueryTableHelper.getClientForPersistentQuery(3*60*1000, owner="PqOwner", name="PqName")
# Get the "pqTable" table from PqOwner's PqName query
pqTableLocal = PersistentQueryTableHelper.getPreemptiveTableFromPersistentQuery("pqTablePreemptive", helper=client)
# Use the table to perform local calculations
derived = pqTableLocal.update(“LocalChangeTime=currentTime()”)
# Alternately, connect to the PqName query by using its serial number
client = PersistentQueryTableHelper.getClientForPersistentQuery(3*60*1000, configSerial=1502904477776000000)
Add some resiliency to your pipeline with a ConnectionAwareRemoteTable
If you’d like your subscribing query to be able to continue functioning correctly during periods when the publishing query is unavailable or restarting, consider using a “Connection-Aware Remote Table”.
What is a Connection-Aware Remote Table?
The Connection-Aware Remote Table (CART provides an error-resilient caching proxy for preemptive updates tables being delivered from a persistent query. If the publishing PQ stops, the CART will continue operating, and will reconnect if the publisher restarts again later. Meanwhile, the CART will either immediately remove all of its own rows, or retain the last known consistent data set until the publisher restarts and supplies a fresh snapshot; the behavior is specified in the subscribing query when setting up the CART.
This allows subscribing queries to have optional dependencies on other PQs. The ConnectionAwareRemoteTable
can be used in most places that otherwise used the PersistentQueryTableHelper.getPreemptiveTableFromPersistentQuery
function.
What else do I need to know?
As in a regular preemptive updates table subscription, you must know the publishing PQ’s owner and name (serial number-based lookup is not currently supported), and the name of the table.
When deciding whether a CART should “clear on disconnect”, you should understand the implications of this setting:
- When
true
, the CART will immediately remove all of its own rows if the publishing PQ stops, and then add new data when the publishing PQ restarts. In other words, a stopped publisher results in an empty table at the subscriber. - When
false
, the CART will retain the last-known information from the publishing PQ until it restarts, then deliver an update replacing the old (possibly stale) data with the restarted PQ’s data. In other words, a stopped publisher results in a non-updating table at the subscriber, with consistent but possibly stale data.
The resilience offered by a CART comes at a complexity cost, in that you need to know the schema (or at least a relevant subset of it) for your target table; given that tables published by a PQ are fundamentally query-driven and transient, there isn’t currently a lookup service that can provide such schemas when the publishing query is not running. CART-using subscribers must supply a prototype Table or TableDefinition that provides the necessary schema subset, and the published table must include at least these columns with matching types for all columns.
The boolean property ConnectionAwareRemoteTable.printDependencyInformation
defaults to false
. When set to true
, the CART will output additional information about its internal dependency resolution. However, this should normally be left false unless performing explicit dependency resolution error investigations.
Log entries related to the CART will be prefaced with the words "Connection-aware remote table". Most log entries will also identify which CART instance is logging the message, by specifying which underlying persistent query table the logging CART is listening to.
Any time a CART disconnects from or connects to a query, a log message will record that fact. If the underlying persistent query table has an error, the CART will continue operating, and the underlying error will be recorded in the log as a warning.
Example CART
This example creates a CART that has the columns "Timestamp" and "AnInt". Our source table "pqTablePreemptive" must at least have the columns "Timestamp" (of type DBDateTime) and "AntInt" (of type int). Since clearOnDisconnect
is set to true
, if the publishing PQ stops, the CART will immediately remove all of its own rows.
import com.illumon.dataobjects.ColumnDefinition
import com.illumon.iris.db.tables.TableDefinition
import com.illumon.iris.controller.utils.\*
cols = new LinkedList<ColumnDefinition>()
cols.add(new ColumnDefinition("Timestamp", DBDateTime.class))
cols.add(new ColumnDefinition("a", int))
defn = new TableDefinition(cols)
cart = new ConnectionAwareRemoteTable(log, "owner", "queryName", "tableName", true, defn)
Troubleshooting
If the query specified in the CART's creation cannot be found, the CART will immediately fail. The query does not have to be running, but it does have to exist. Otherwise, it would be likely that a typo in the owner or query name would make it appear that the CART was failing to pick up data; by failing, a more direct indicator of the problem can be made available.
If the table prototype or definition passed in is not compatible with the table definition from the persistent query, the CART will fail. A compatible definition means that the persistent query's table must contain all of the columns defined in the CART, with exactly matching types. Any extra columns in the persistent query's table will be ignored.
If you connect an administrator console to the publishing PQ you want the CART to connect to, and then try creating the CART from within that console, the CART will get stuck in an infinite loop, and the persistent query will become unresponsive and have to be restarted. Don’t do this.
An initially-created CART is a valid empty table until it receives data from the publishing PQ. In some cases (such as writing data out to a file), it may be important to wait for the CART to populate. In these cases, the awaitUpdate
method is recommended.
Developer Documentation
If you like Javadocs for reference (and really who doesn’t?), here are a few to get you started:
- For the publication side of things: Table.preemptiveUpdatesTable(long updateInterval)
- For the client helper class that Deephaven engineers use and highly-recommend: PersistentQueryTableHelper
- For a really “roll-your-own” level of integration: PersistentQueryControllerClient
- For extra resiliency: ConnectionAwareRemoteTable