Table listeners
With the Deephaven Query Language, it is very easy to create dynamic queries that update in real time. When a dynamic table updates, a message describing the changes is sent to all listeners of the table. This mechanism is what makes dynamic queries work, but it can also be used to create new, dynamic functionality.
For example, imagine using the Deephaven Query Language to create a dynamic table that monitors for situations needing human intervention. You can create a table listener that sends a Slack message every time the table ticks. Similarly, you could have a table of orders to buy or sell stocks. If rows are added to the order table, new orders are sent to the broker, and if rows are removed from the order table, orders are canceled with the broker.
Here you will learn how to create your own table listeners.
Table listeners in Python
Note
Python listeners can be significantly slower than Groovy or Java listeners. If your listener will be processing large amounts of data, consider using Groovy or Java.
Example listener
Note
The Python examples in this section require version 3.6 or later.
There are two ways to listen to tables. The most simple way to listen is to create a listener function that takes a single argument. This argument is an object which describes the change. This description includes rows added, rows modified, rows deleted, etc.
from deephaven import listen
def f(update):
print(f"FUNCTION LISTENER: update={update}")
t = db.timeTable("00:00:01").update("X=i").tail(5)
handle = listen(t, f)
listen(...)
registers the listener function with the dynamic table and returns a handle to the listener. This will immediately start listening to the table and calling f(...)
every time t
changes.
For cases where a table already contains data, and this data should be processed by the listener before processing new events, you can use:
handle = listen(t, f, replay_initial=True)
See the Deephaven Python API documentation for a list of all possible listener arguments.
The second way to listen to a table is using a listener object, instead of a listener function. The listener object must have an onUpdate(...)
method, which takes a single argument. This is most frequently used in cases where the listener must keep track of state. In this example, the listener will keep track of how many times it has been called.
from deephaven import listen
class ExampleListener:
def __init__(self):
self.counter = 0
def onUpdate(self, update):
self.counter += 1
print(f"CLASS LISTENER: counter={self.counter} update={update}")
listener = ExampleListener()
t = db.timeTable("00:00:01").update("X=i").tail(5)
handle = listen(t, listener)
Adding and removing listeners
Most applications will want to register a listener for the entire time the application runs, but some applications may only want listeners to be registered for a limited time. The listener handle can be used to register and deregister a listener.
This example shows how listeners can be added and removed. By default, calling listener(...)
immediately begins listening to table changes. Then three timers are created to start and stop listening:
- After 10 seconds, the listener is deregistered and stops listening.
- After 20 seconds, the listener is registered and starts listening again.
- After 30 seconds, the listener is deregistered and stops listening.
from deephaven import listen
def f(update):
print(f"FUNCTION LISTENER: update={update}")
t = db.timeTable("00:00:01").update("X=i").tail(5)
handle = listen(t, f)
def removeListeners():
print('Removing listener')
handle.deregister()
def addListeners():
print('Adding listeners')
handle.register()
from threading import Timer
Timer(10, removeListeners).start()
Timer(20, addListeners).start()
Timer(30, removeListeners).start()
Reading table data
The update object passed to listener methods contains information about which rows have changed. It does not contain the actual changed data. To obtain the changed data, the data must be retrieved from the table’s column source.
The update object contains:
added
: Indices of added rows (in post-shift key-space)removed
: Indices of removed rows (in pre-shift key-space)modified
: Indices of modified rows (in post-shift key-space)shifted
: A structure that describes a list of shifts. A shift is the tripletstart
,end
,shiftDelta
, and describes that all rows that existed in the range[start, end]
(inclusive) shifted byshiftDelta
and can now be found in the range[start + shiftDelta, end + shiftDelta]
. The table is not required to have a valid row for every value in the range[start, end]
. Shifts are only allowed to increase / decrease the empty-keyspace between rows, and they are never allowed to reorder rows.modifiedColumnSet
: Set of columns which are changed in modified rows.
In this example, the values from the added and removed rows are printed. Note that values for added rows are retrieved using get(idx)
, which retrieves values from the current table snapshot. Values for removed rows are retrieved using getPrev(idx)
, which retrieves values from the previous table snapshot. Because added rows are only present in the current snapshot, get(idx)
must be used to retrieve the values. Similarly, deleted rows are only present in the previous snapshot, so getPrev(idx)
must be used to retrieve the values. Modified rows can be accessed using either get(idx)
or getPrev(idx)
, depending upon if the current or previous values are desired.
from deephaven import listen
def f(update):
print(f"FUNCTION LISTENER: update={update}")
added_iterator = update.added.iterator()
while added_iterator.hasNext():
idx = added_iterator.nextLong()
ts = t.getColumnSource("Timestamp").get(idx)
x = t.getColumnSource("X").get(idx)
y = t.getColumnSource("Y").get(idx)
print(f"\tADDED VALUES: ts={ts} x={x} y={y}")
removed_iterator = update.removed.iterator()
while removed_iterator.hasNext():
idx = removed_iterator.nextLong()
ts = t.getColumnSource("Timestamp").getPrev(idx)
x = t.getColumnSource("X").getPrev(idx)
y = t.getColumnSource("Y").getPrev(idx)
print(f"\tREMOVED VALUES: ts={ts} x={x} y={y}")
t = db.timeTable("00:00:01").update("X=i", "Y=new int[]{0,1,2}").tail(5).ungroup()
handle = listen(t, f)
LiveTableMonitor Locks
Deephaven is continuously processing and updating dynamic queries. To ensure that your listener is processing a consistent data snapshot, a LiveTableMonitor
(LTM) lock must be held.
The listen(...)
method ensures that a listener is called while the LTM lock is held. This ensures data consistency. Be aware that if your listener spawns off other work, that work may need to acquire the LTM lock.
See the doLocked(...)
method (Python) and Controlling table update frequency for more details.
Additionally, because the LTM lock is held while a listener is evaluated, slow listeners can cause the entire dynamic query to slow down. If your listener is slow, consider alternative designs, where the slow work is executed asynchronously on a separate compute thread.
Table listeners in Groovy
To implement a listener, create a class that extends InstrumentedShiftAwareListenerAdapter
and implements the onUpdate(...)
method. onUpdate(...)
takes a single argument that describes the table change. This description includes rows added, rows modified, rows deleted, etc.
Example listener
import com.illumon.iris.db.v2.DynamicTable
import com.illumon.iris.db.v2.ShiftAwareListener
import com.illumon.iris.db.v2.InstrumentedShiftAwareListenerAdapter
class ExampleListener extends InstrumentedShiftAwareListenerAdapter {
ExampleListener(String description, DynamicTable source, boolean retain) {
super(description, source, retain)
}
void onUpdate(ShiftAwareListener.Update update) {
println("FUNCTION LISTENER: update=${update}")
}
}
t = db.timeTable("00:00:01").update("X=i").tail(5)
listener = new ExampleListener("Test Listener", t, true)
t.listenForUpdates(listener)
Adding and removing listeners
Most applications will want to register a listener for the entire time the application runs, but some applications may only want listeners to be registered for a limited time. The listener can be activated and deactivated.
This example shows how listeners can be activated and deactivated. The listener is added, and then three timers are created to start and stop listening:
- After 10 seconds, the listener is deregistered and stops listening.
- After 20 seconds, the listener is registered and starts listening again.
- After 30 seconds, the listener is deregistered and stops listening.
import com.illumon.iris.db.v2.DynamicTable
import com.illumon.iris.db.v2.ShiftAwareListener
import com.illumon.iris.db.v2.InstrumentedShiftAwareListenerAdapter
class ExampleListener extends InstrumentedShiftAwareListenerAdapter {
ExampleListener(String description, DynamicTable source, boolean retain) {
super(description, source, retain)
}
void onUpdate(ShiftAwareListener.Update update) {
println("FUNCTION LISTENER: update=${update}")
}
}
t = db.timeTable("00:00:01").update("X=i").tail(5)
listener = new ExampleListener("Test Listener", t, true)
t.listenForUpdates(listener)
new Timer().runAfter(10000) {
println("Removing listener")
t.removeUpdateListener(listener)
}
new Timer().runAfter(20000) {
println("Adding listener")
t.listenForUpdates(listener)
}
new Timer().runAfter(30000) {
println("Removing listener")
t.removeUpdateListener(listener)
}
Reading table data
The update object passed to listener methods contains information about which rows have changed. It does not contain the actual changed data. To obtain the changed data, the data must be retrieved from the table’s column source.
The update object contains:
added
: Indices of added rows (in post-shift key-space)removed
: Indices of removed rows (in pre-shift key-space)modified
: Indices of modified rows (in post-shift key-space)shifted
: A structure that describes a list of shifts. A shift is the tripletstart
,end
,shiftDelta
, and describes that all rows that existed in the range[start, end]
(inclusive) shifted byshiftDelta
and can now be found in the range[start + shiftDelta, end + shiftDelta]
. The table is not required to have a valid row for every value in the range[start, end]
. Shifts are only allowed to increase / decrease the empty-keyspace between rows, and they are never allowed to reorder rows.modifiedColumnSet
: Set of columns which are changed in modified rows.
In this example, the values from the added and removed rows are printed. Note that values for added rows are retrieved using get(idx)
, which retrieves values from the current table snapshot. Values for removed rows are retrieved using getPrev(idx)
, which retrieves values from the previous table snapshot. Because added rows are only present in the current snapshot, get(idx)
must be used to retrieve the values. Similarly, deleted rows are only present in the previous snapshot, so getPrev(idx)
must be used to retrieve the values. Modified rows can be accessed using either get(idx)
or getPrev(idx)
, depending upon if the current or previous values are desired.
import com.illumon.iris.db.v2.DynamicTable
import com.illumon.iris.db.v2.ShiftAwareListener
import com.illumon.iris.db.v2.InstrumentedShiftAwareListenerAdapter
class ExampleListener extends InstrumentedShiftAwareListenerAdapter {
DynamicTable t
ExampleListener(String description, DynamicTable source, boolean retain) {
super(description, source, retain)
t = source
}
void onUpdate(ShiftAwareListener.Update update) {
println("FUNCTION LISTENER: update=${update}")
def added_iterator = update.added.iterator()
while(added_iterator.hasNext()) {
def idx = added_iterator.nextLong()
def ts = t.getColumnSource("Timestamp").get(idx)
def x = t.getColumnSource("X").get(idx)
def y = t.getColumnSource("Y").get(idx)
println("\tADDED VALUES: ts=${ts} x=${x} y=${y}")
}
def removed_iterator = update.removed.iterator()
while(removed_iterator.hasNext()) {
def idx = removed_iterator.nextLong()
def ts = t.getColumnSource("Timestamp").getPrev(idx)
def x = t.getColumnSource("X").getPrev(idx)
def y = t.getColumnSource("Y").getPrev(idx)
println("\tREMOVED VALUES: ts=${ts} x=${x} y=${y}")
}
}
}
t = db.timeTable("00:00:01").update("X=i", "Y=new int[]{0,1,2}").tail(5).ungroup()
listener = new ExampleListener("Test Listener", t, true)
t.listenForUpdates(listener)
LiveTableMonitor Locks
Deephaven is continuously processing and updating dynamic queries. To ensure that your listener is processing a consistent data snapshot, a LiveTableMonitor
(LTM) lock must be held.
The listen(...)
method ensures that a listener is called while the LTM lock is held. This ensures data consistency. Be aware that if your listener spawns off other work, that work may need to acquire the LTM lock.
See Controlling table update frequency for more details.
Additionally, because the LTM lock is held while a listener is evaluated, slow listeners can cause the entire dynamic query to slow down. If your listener is slow, consider alternative designs, where the slow work is executed asynchronously on a separate compute thread.
Replay initial snapshot
There are times when the initial contents of a table need to be processed by the listener before future events are processed by the listener. To accomplish this:
- Acquire the LTM lock.
- Process the contents of the table.
- Add the listener.
- Release the LTM lock.
Holding the LTM lock ensures that the listener will see the initial contents of the table and all new ticks. If the lock is not held, the contents of the table may change while processing the initial snapshot. Keep in mind that LTM locking can be done with either an exclusive or a shared lock. An exclusive lock can be held by either one reader or one writer, while a shared lock can be held by multiple readers or one writer.
import com.illumon.iris.db.v2.DynamicTable
import com.illumon.iris.db.v2.ShiftAwareListener
import com.illumon.iris.db.v2.InstrumentedShiftAwareListenerAdapter
import com.illumon.iris.db.tables.live.LiveTableMonitor
import com.illumon.iris.db.v2.utils.TreeIndex
import com.illumon.iris.db.v2.utils.IndexShiftData
import com.illumon.iris.db.v2.ModifiedColumnSet
class ExampleListener extends InstrumentedShiftAwareListenerAdapter {
ExampleListener(String description, DynamicTable source, boolean retain) {
super(description, source, retain)
}
void onUpdate(ShiftAwareListener.Update update) {
println("FUNCTION LISTENER: update=${update}")
}
}
t = db.timeTable("00:00:01").update("X=i", "Y=new int[]{0,1,2}").tail(5).ungroup()
listener = new ExampleListener("Test Listener", t, true)
LiveTableMonitor.DEFAULT.sharedLock().doLocked({
emptyIndex = new TreeIndex()
emptyShift = new IndexShiftData.Builder().build()
emptyColumnSet = ModifiedColumnSet.EMPTY
update = new ShiftAwareListener.Update(t.getIndex(), emptyIndex, emptyIndex, emptyShift, emptyColumnSet)
listener.onUpdate(update)
t.listenForUpdates(listener)
})