Listen to ticking tables
Deephaven makes it easy to create dynamic queries that update in real time. When a table updates, a message describing the changes is sent to all listeners of the table. This mechanism is what makes ticking queries work. It can also be used to create new, dynamic functionality.
As an example, consider using a Deephaven query to create a dynamic table that monitors for situations needing human intervention. You can create a table listener that sends a Slack message every time one or more tables tick. Similarly, you could have a table of orders to buy or sell stocks. If rows are added to the order table, new orders are sent to the broker, and if rows are removed from the order table, orders are canceled with the broker.
This guide will show you how to create your own table listeners in Python.
What is a table listener?
A table listener is an object that listens to one or more tables for updates. When connected to a ticking table, a listener receives one or more TableUpdate
objects that can be used to access the added, modified, or removed data.
When listening to changes in a single table, use the Deephaven listen
function, and when listening to changes in multiple tables use the Deephaven merged_listen
function. Both functions accept either a listener function or a listener class.
Listen to one ticking table
Regardless of whether you use a listener function or a listener class, you will need to use the listen
function to register the listener with a table. Once a listener is registered, it will begin receiving updates.
With a listener function
A listener function takes two inputs:
update
: aTableUpdate
object, which contains added, modified, and removed data.is_replay
: A boolean value that isTrue
when replaying the initial snapshot andFalse
otherwise. This will only ever beTrue
when the listener receives its first update anddo_replay
is set to true when callinglisten
.
The following example listens to a time table.
from deephaven.table_listener import listen
from deephaven import time_table
def listener_function(update, is_replay):
print(f"FUNCTION LISTENER: update={update}")
print(f"is_replay: {is_replay}")
table = time_table("PT1S").update(formulas=["X=i"]).tail(5)
handle = listen(table, listener_function)
With a listener class
The table listener class gives more control when listening to table changes. It requires defining an on_update
function that takes the same arguments as a listener function, update
and is_replay
. The on_update
function is called every time the associated table is updated.
Listener classes are useful in cases where the listener must keep track of state. In this example, the listener will keep track of how many times it has been called.
from deephaven.table_listener import listen, TableListener
from deephaven import time_table
class ExampleListener(TableListener):
def __init__(self):
self.counter = 0
def on_update(self, update, is_replay):
self.counter += 1
print(f"CLASS LISTENER: counter={self.counter} update={update}")
print(f"is_replay: {is_replay}")
listener_class = ExampleListener()
table = time_table("PT1S").update(formulas=["X=i"]).tail(5)
handle = listen(table, listener_class)
Listen to multiple ticking tables
Regardless of whether you use a listener function or a listener class, you will need to use the merged_listen
function to register the listener with a table. Once a listener is registered, it will begin receiving updates.
With a listener function
A merged listener function takes two inputs:
update
: a dictionary ofTableUpdate
objects, with the input tables as keys.is_replay
: A boolean value that isTrue
when replaying the initial snapshot andFalse
otherwise. This will only ever beTrue
when the listener receives its first update anddo_replay
is set totrue
when callingmerged_listen
.
The following example listens to two time tables, one ticking every two seconds and the other ticking every three seconds. updates[t1]
and updates[t2]
return a TableUpdate
object for each table if it has been updated and None
otherwise.
from deephaven.table_listener import merged_listen
from deephaven import time_table
t1 = time_table("PT2s").update(["X = i"])
t2 = time_table("PT3s").update(["Y = ii"])
def listener_function(updates, is_replay):
if tu1 := updates[t1]:
print(f"t1: {tu1.added()}")
if tu2 := updates[t2]:
print(f"t2: {tu2.added()}")
handle = merged_listen([t1, t2], listener_function)
updates
contains None
values for any table that has not changed during the update cycle. These None
values must be handled to avoid raising errors.
With a listener class
The merged listener class gives more control when listening to multiple table changes. It requires defining an on_update
function that takes the same arguments as a listener function, update
and is_replay
. The on_update
function is called every time the associated tables are updated.
Listener classes are useful in cases where the listener must keep track of state. In this example, the listener will keep track of how many times it has been called.
from deephaven.table_listener import merged_listen, MergedListener
from deephaven import time_table
class ExampleListener(MergedListener):
def __init__(self):
self.counter = 0
def on_update(self, updates, is_replay):
self.counter += 1
print(f"CLASS LISTENER: counter={self.counter}")
if tu1 := updates[source1]:
print(f"Source1: {tu1.added()}")
if tu2 := updates[source2]:
print(f"Source2: {tu2.added()}")
print(f"is_replay: {is_replay}")
listener_class = ExampleListener()
source1 = time_table("PT1S").update(formulas=["X=i"]).tail(5)
source2 = time_table("PT2S").update(formulas=["Y=ii"]).tail(5)
handle = merged_listen([source1, source2], listener_class)
Access table data
Regardless of whether you're listening to one or multiple tables, a TableUpdate
object contains the added, modified, and removed rows from a table. There are several ways to access this data.
The following methods return a dict with column names as keys and NumPy arrays as values:
added
- rows added during the current update cycle.modified
- rows modified during the current update cycle.removed
- rows removed during the current update cycle.modified_prev
- rows modified during the previous update cycle.
The following example listens to added rows during each update cycle. It prints the data as the listener receives it.
from deephaven.table_listener import listen
from deephaven import time_table
def listener_function(update, is_replay):
data_added = update.added()
print(data_added)
source = time_table("PT1s").update("X = i").tail(5)
handle = listen(source, listener_function)
The following methods are chunked accessors and return a generator. Each call to the generator returns data on a subset (chunk) of the changed rows. To see all rows, iterate over all chunks in the generator. The chunk of changed rows is a dictionary with column names as keys and NumPy arrays as values.
added_chunks
- rows added during the current update cycle.modified_chunks
- rows modified during the current update cycle.removed_chunks
- rows removed during the current update cycle.modified_prev_chunks
- rows modified during the previous update cycle.
Chunked accessors are typically used when updates are large. While normal accessors load an entire update into memory all at once, chunked accessors load the update in smaller pieces, thus limiting memory usage. The following example splits added data into chunks of 100 rows at a time and prints the size of each chunk.
from deephaven.table_listener import listen
from deephaven import time_table
def listener_function(update, is_replay):
data_added_chunks = update.added_chunks(chunk_size=100)
for idx, chunk in enumerate(data_added_chunks):
curr_x_chunk = chunk["X"]
print(f"Chunk #{idx + 1}: {curr_x_chunk} rows.")
source = time_table("PT0.001s").update("X = i")
handle = listen(source, listener_function)
To see the previous values for modified rows, use:
The following example listens to modified rows during each update cycle. It prints the current and previous values of the modified rows for the X
column.
from deephaven.table_listener import listen
from deephaven import time_table
def listener_function(update, is_replay):
prev_modified = update.modified_prev()
curr_modified = update.modified()
if not prev_modified:
print("No previous values")
return
for prev, curr in zip(prev_modified["X"], curr_modified["X"]):
print(f"Change previous={prev} current={curr}")
table = time_table("PT0.1s").update("X = i").last_by()
handle = listen(table, listener_function)
Error handling
A table listener can also be supplied with a method to handle errors. This method gets called as soon as the listener function or class raises an exception. The method takes an exception as input, and can be made to take any action necessary to handle the error or notify the user, client APIs, or other services. Error handling is done identically for both single and merged listeners.
Consider an example where a listener raises an error if it receives a value greater than 10. An error handling method is used to print a message about the error to the console.
from deephaven.table_listener import listen
from deephaven import time_table
def listener_function(update, is_replay):
added = update.added()
if not added:
return
if any([item > 10 for item in added["X"]]):
raise Exception("Value over 10 detected!")
def error_function(e: Exception):
print("A value over 10 was detected. Please investigate.")
tt = time_table("PT1s").update("X = i + 6")
handle = listen(t=tt, listener=listener_function, on_error=error_function)
Add and remove listeners
Most applications that require the use of a table listener do so for the entirety of the application's lifetime. If a listener should only be registered for a specified period of time, a listener handle can be registered and deregistered using the start
and stop
methods.
The following example uses threading.Timer
to deregister a listener after 3 seconds and then re-register it after 6 seconds.
from deephaven.table_listener import listen
from deephaven import time_table
import time
from threading import Timer
def listener_function(update, is_replay):
print(f"FUNCTION LISTENER: update={update}")
print(f"is_replay: {is_replay}")
def stop_listener(handle):
handle.stop()
def start_listener(handle):
handle.start()
table = time_table("PT1S").update(formulas=["X=i"]).tail(5)
handle = listen(table, listener_function)
Timer(3, stop_listener, args=[handle]).start()
Timer(6, start_listener, args=[handle]).start()
Reduce data volumes
Tables often tick at high frequencies and with large quantities of incoming data. It's best practice to only listen to what's required for an operation. In such cases, applying filters and/or reducing tick frequencies will reduce both the quantity and frequency of incoming data to a listener.
The following example listens to a table that has been filtered and had its tick frequency reduced to reduce the rate at which the listener receives data.
from deephaven.table_listener import listen
from deephaven import time_table
def listener_function(update, is_replay):
print(f"FUNCTION LISTENER for even values: update={update}")
source = time_table("PT0.5s").update(formulas=["X=i"]).tail(5)
trigger = time_table("PT2s").rename_columns("DateTime = Timestamp")
result = source.where(filters=["X % 2 = 0"]).snapshot_when(trigger_table=trigger)
handle = listen(result, listener_function)
Replay data
A table listener can listen to data that existed before the listener was registered. For example, a listener that isn't registered until 10 seconds after a table starts ticking can be made to listen to the data that was created during those 10 seconds.
To make a listener listen to previously existing data, set the do_replay
parameter to True
when calling listen
.
The following example registers two listeners with a time table two seconds after it's created. Only the one that sets do_replay
to True
receives data when it's first registered.
from deephaven.table_listener import listen
from deephaven import time_table
import time
def listener_function(update, is_replay):
print(f"FUNCTION LISTENER for even values: update={update}")
print(f"is_replay={is_replay}")
source = time_table("PT0.3s").update("X = i")
# The following code is run two seconds after the code above
handle_no_replay = listen(source, listener_function, do_replay=False)
handle_replay = listen(source, listener_function, do_replay=True)
handle_no_replay.stop()
handle_replay.stop()
Dependent tables
Listeners can use data from tables other than the one they are listening to if the additional tables are configured as dependencies. When one or more tables are listed as a dependency to a listener, the query engine will wait to call the listener until all dependent tables have been processed. When a table is not listed as a dependency, it may be in an inconsistent state when accessed.
Don't do table operations inside the listener. While performing operations on the dependent tables in the listener is safe, it is not recommended because reading or operating on the result tables of those operations may not be safe. It is best to perform the operations on the dependent tables beforehand and then add the result tables as dependencies to the listener so that they can be safely read in it.
For example, consider two tables, source_a
and source_b
, that tick simultaneously but cannot be joined. When listening to source_a
, it is not guaranteed that source_b
will have its updates processed in full before the listener receives the update from source_a
. To guarantee that all data is processed before the listener triggers, source_b
must be registered as a dependency for the listener.
from deephaven.table_listener import listen
from deephaven.numpy import to_numpy
from deephaven import time_table
from random import choice
def rand_letter() -> str:
return choice(["A", "B", "C", "D"])
def listen_func(update, is_replay):
added = update.added()
print(f"From Source A: {added['X'].item()}")
dependent_data = to_numpy(source_b.view("Y")).squeeze()
if dependent_data.ndim > 0:
print(
f"From Source B: {', '.join([str(item) for item in dependent_data.squeeze()])}"
)
source_a = time_table("PT2s").update_view(["X = ii"])
source_b = (
time_table("PT2s")
.update(["Letter = rand_letter()", "Y = randomDouble(0, 10)"])
.drop_columns("Timestamp")
.last_by("Letter")
)
handle = listen(t=source_a, listener=listen_func, dependencies=source_b)
Example
Table listeners are often used to trigger actions based on table updates. For example, a listener could notify Slack or send an email when data meets some criteria. The following example prints values that meet certain criteria. In a real-world use case, rather than print an outlier value, a notification could be sent to relevant parties via email, Slack, Discord, or other service.
from deephaven.table_listener import listen
from deephaven import time_table
def listener_function(update, is_replay):
added = update.added()
if not added:
return
if any([item > 9 for item in added["X"]]):
print("Value over 9 detected!")
source = (
time_table("PT0.33s", blink_table=True)
.update("X = randomDouble(0, 10)")
.drop_columns("Timestamp")
)
handle = listen(source, listener_function, do_replay=False)