Write data to an in-memory, real-time table
This guide covers publishing data to in-memory ticking tables with two methods:
A Table Publisher publishes data to a blink table, while a Dynamic Table Writer writes data to an append-only table. Both methods are great ways to ingest and publish data from external sources such as WebSockets and other live data sources. However, we recommend table_publisher
in most cases, as it provides a newer and more refined API, as well as native support for blink tables.
Table publisher
Table publisher uses the table_publisher
factory function to create an instance of the TablePublisher
as well as its linked blink table. From there:
- Add data to the blink table with
add
. - (Optionally) Store data history in a downstream table.
- (Optionally) Shut the publisher down when finished.
More sophisticated use cases will add steps but follow the same basic formula.
The factory function
The table_publisher
factory function returns a TablePublisher
and its linked blink table, in that order. The following code block creates a table publisher named My table publisher
that publishes to a blink table with two columns, X
and Y
, which are int
and double
data types, respectively.
from deephaven.stream.table_publisher import table_publisher
from deephaven import dtypes as dht
my_table, my_publisher = table_publisher(
name="My table publisher", col_defs={"X": dht.int32, "Y": dht.double}
)
Example: Getting started
The following example creates a table with three columns (X
, Y
, and Z
). The columns initially contain no data because add_table
has not yet been called.
from deephaven.stream.table_publisher import table_publisher
from deephaven import dtypes as dht
from deephaven import empty_table
coldefs = {"X": dht.int32, "Y": dht.double, "Z": dht.double}
def on_shutdown():
print("Table publisher is shut down.")
my_table, publisher = table_publisher(
name="My table", col_defs=coldefs, on_shutdown_callback=on_shutdown
)
def add_table(n):
publisher.add(
empty_table(n).update(
[
"X = randomInt(0, 10)",
"Y = randomDouble(0.0, 1.0)",
"Z = randomDouble(10.0, 100.0)",
]
)
)
def when_done():
publisher.publish_failure(RuntimeError("Publisher shut down by user."))
- my_table
Subsequent calls of add_table
will add data to my_table
.
add_table(10)
The TablePublisher
can be shut down by calling publish_failure
. In this case, the when_done
function invokes it.
when_done()
Example: threading
The previous example required manual calls to add_table
to populate my_table
with data. In most real-world use cases, adding data to the table should be automated at some regular interval. This can be achieved using Python's threading module. The following example adds between 5 and 10 rows of data to my_table
via empty_table
every second for 5 seconds.
A ticking table in a thread must be updated from within an execution context.
from deephaven.stream.table_publisher import table_publisher
from deephaven.execution_context import get_exec_ctx
from deephaven import dtypes as dht
from deephaven import empty_table
import asyncio, random, threading, time
coldefs = {"X": dht.int32, "Y": dht.double}
def shut_down():
print("Shutting down table publisher.")
my_table, my_publisher = table_publisher(
name="Publisher", col_defs=coldefs, on_shutdown_callback=shut_down
)
def when_done():
my_publisher.publish_failure(RuntimeError("when_done invoked"))
def add_table(n):
my_publisher.add(
empty_table(n).update(["X = randomInt(0, 10)", "Y = randomDouble(-50.0, 50.0)"])
)
ctx = get_exec_ctx()
def thread_func():
with ctx:
for i in range(5):
add_table(random.randint(5, 10))
time.sleep(1)
thread = threading.Thread(target=thread_func)
thread.start()
Example: asyncio
The following code block uses asynchronous execution to pull crypto data from Coinbase's WebSocket feed. The asynchronous execution is used for ingesting the external data to minimize idle CPU time.
The websockets package is required to run the code below.
from deephaven.stream.table_publisher import table_publisher, TablePublisher
from deephaven.column import string_col, double_col, datetime_col, long_col
from deephaven.dtypes import int64, string, double, Instant
from deephaven.time import to_j_instant
from deephaven.table import Table
from deephaven import new_table
import asyncio, json, websockets
from dataclasses import dataclass
from typing import Callable
from threading import Thread
from datetime import datetime
from concurrent.futures import CancelledError
COINBASE_WSFEED_URL = "wss://ws-feed.exchange.coinbase.com"
@dataclass
class Match:
type: str
trade_id: int
maker_order_id: str
taker_order_id: str
side: str
size: str
price: str
product_id: str
sequence: int
time: str
async def handle_matches(
product_ids: list[str], message_handler: Callable[[Match], None]
):
async for websocket in websockets.connect(COINBASE_WSFEED_URL):
await websocket.send(
json.dumps(
{
"type": "subscribe",
"product_ids": product_ids,
"channels": ["matches"],
}
)
)
# Skip subscribe response
await websocket.recv()
# Skip the last_match messages
for _ in product_ids:
await websocket.recv()
async for message in websocket:
message_handler(Match(**json.loads(message)))
def to_table(matches: list[Match]):
return new_table(
[
datetime_col("Time", [to_j_instant(x.time) for x in matches]),
long_col("TradeId", [x.trade_id for x in matches]),
string_col("MakerOrderId", [x.maker_order_id for x in matches]),
string_col("TakerOrderId", [x.taker_order_id for x in matches]),
string_col("Side", [x.side for x in matches]),
double_col("Size", [float(x.size) for x in matches]),
double_col("Price", [float(x.price) for x in matches]),
string_col("ProductId", [x.product_id for x in matches]),
long_col("Sequence", [x.sequence for x in matches]),
]
)
def create_matches(
product_ids: list[str], event_loop
) -> tuple[Table, Callable[[], None]]:
on_shutdown_callbacks = []
def on_shutdown():
nonlocal on_shutdown_callbacks
for c in on_shutdown_callbacks:
c()
my_matches: list[Match] = []
def on_flush(tp: TablePublisher):
nonlocal my_matches
# We need to take a shallow copy to ensure we don't allow asyncio additions to
# my_matches while we are in java (where we drop the GIL)
my_matches_copy = my_matches.copy()
my_matches.clear()
tp.add(to_table(my_matches_copy))
table, publisher = table_publisher(
f"Matches for {product_ids}",
{
"Time": Instant,
"TradeId": int64,
"MakerOrderId": string,
"TakerOrderId": string,
"Side": string,
"Size": double,
"Price": double,
"ProductId": string,
"Sequence": int64,
},
on_flush_callback=on_flush,
on_shutdown_callback=on_shutdown,
)
future = asyncio.run_coroutine_threadsafe(
handle_matches(product_ids, my_matches.append), event_loop
)
def on_future_done(f):
nonlocal publisher
try:
e = f.exception(timeout=0) or RuntimeError("completed")
except CancelledError as c:
e = RuntimeError("cancelled")
publisher.publish_failure(e)
future.add_done_callback(on_future_done)
on_shutdown_callbacks.append(future.cancel)
return table, future.cancel
my_event_loop = asyncio.new_event_loop()
Thread(target=my_event_loop.run_forever).start()
def subscribe_stats(product_ids: list[str]):
blink_table, on_done = create_matches(product_ids, my_event_loop)
return blink_table, on_done
t1, t1_cancel = subscribe_stats(["BTC-USD"])
t2, t2_cancel = subscribe_stats(["ETH-USD", "BTC-USDT", "ETH-USDT"])
# call these to explicitly cancel
# t1_cancel()
# t2_cancel()
Data history
Table publishers create blink tables. Blink tables do not store any data history - data is gone forever at the start of a new update cycle. In most use cases, you will want to store some or all of the rows written during previous update cycles. There are two ways to do this:
- Store some data history by creating a downstream ring table with
ring_table
. - Store all data history by creating a downstream append-only table with
blink_to_append_only
.
See the table types user guide for more information on these table types, including which one is best suited for your application.
To show the storage of data history, we will extend the threading example by creating a downstream ring table and append-only table.
from deephaven.stream.table_publisher import table_publisher
from deephaven.execution_context import get_exec_ctx
from deephaven.stream import blink_to_append_only
from deephaven import dtypes as dht
from deephaven import empty_table
from deephaven import ring_table
import asyncio, random, threading, time
coldefs = {"X": dht.int32, "Y": dht.double}
def shut_down():
print("Shutting down table publisher.")
my_table, my_publisher = table_publisher(
name="Publisher", col_defs=coldefs, on_shutdown_callback=shut_down
)
def when_done():
my_publisher.publish_failure(RuntimeError("when_done invoked"))
def add_table(n):
my_publisher.add(
empty_table(n).update(["X = randomInt(0, 10)", "Y = randomDouble(-50.0, 50.0)"])
)
ctx = get_exec_ctx()
def thread_func():
with ctx:
for i in range(5):
add_table(random.randint(5, 10))
time.sleep(1)
thread = threading.Thread(target=thread_func)
thread.start()
# Downstream ring table that stores the most recent 15 rows
my_ring_table = ring_table(my_table, 15, initialize=True)
# Downstream append-only table
my_append_only_table = blink_to_append_only(my_table)
DynamicTableWriter
DynamicTableWriter
writes data into live, in-memory tables by specifying the name and data types of each column. The use of DynamicTableWriter
to write data to an in-memory ticking table generally follows a formula:
- Create the
DynamicTableWriter
. - Get the table that
DynamicTableWriter
will write data to. - Write data to the table (done in a separate thread).
- Close the table writer.
In most cases, a table publisher is the preferred way to write data to a live table. However, it may be more convenient to use DynamicTableWriter
if you are adding very few rows (i.e., one) at a time and you prefer a simple interface. It is almost always more flexible and performant to use table_publisher
.
Example: Getting started
The following example creates a table with two columns (A
and B
). The columns contain randomly generated integers and strings, respectively. Every second, for ten seconds, a new row is added to the table.
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
import random, string, threading, time
# Create a DynamicTableWriter with two columns: `A`(int) and `B`(String)
table_writer = DynamicTableWriter({"A": dht.int64, "B": dht.string})
result = table_writer.table
# Function to log data to the dynamic table
def thread_func():
# for loop that defines how much data to populate to the table
for i in range(10):
# the data to put into the table
a = random.randint(1, 100)
b = random.choice(string.ascii_letters)
# The write_row method adds a row to the table
table_writer.write_row(a, b)
# seconds between new rows inserted into the table
time.sleep(1)
# Thread to log data to the dynamic table
thread = threading.Thread(target=thread_func)
thread.start()
Example: Trig Functions
The following example writes rows containing X
, sin(X)
, cos(X)
, and tan(X)
and plots the functions as the table updates.
from deephaven import DynamicTableWriter
from deephaven.plot.figure import Figure
import deephaven.dtypes as dht
import numpy as np
import threading
import time
table_writer = DynamicTableWriter(
{"X": dht.double, "SinX": dht.double, "CosX": dht.double, "TanX": dht.double}
)
trig_functions = table_writer.table
def write_data_live():
for i in range(628):
start = time.time()
x = 0.01 * i
y1 = np.sin(x)
y2 = np.cos(x)
y3 = np.tan(x)
table_writer.write_row(x, y1, y2, y3)
end = time.time()
time.sleep(0.2 - (start - end))
thread = threading.Thread(target=write_data_live)
thread.start()
figure = Figure()
trig_fig = (
figure.plot_xy(series_name="Sin(X)", t=trig_functions, x="X", y="SinX")
.plot_xy(series_name="Cos(X)", t=trig_functions, x="X", y="CosX")
.plot_xy(series_name="Tan(X)", t=trig_functions, x="X", y="TanX")
)
trig_fig = trig_fig.chart_title(title="Trig Functions")
trig_plot = trig_fig.show()
DynamicTableWriter and the Update Graph
Both the Python interpreter and the DynamicTableWriter
require the Update Graph (UG) lock to execute. As a result, new rows will not appear in output tables until the next UG cycle. As an example, what would you expect the print
statement below to produce?
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
column_definitions = {"Numbers": dht.int32, "Words": dht.string}
table_writer = DynamicTableWriter(column_definitions)
result = table_writer.table
table_writer.write_row(1, "Testing")
table_writer.write_row(2, "Dynamic")
table_writer.write_row(3, "Table")
table_writer.write_row(4, "Writer")
print(result.j_table.isEmpty())
- result
- Log
You may be surprised, but the table does not contain rows when the print
statement is reached. The Python interpreter holds the UG lock while the code block executes, preventing result
from being updated with the new rows until the next UG cycle. Because print
is in the code block, it sees the table before rows are added.
However, calling the same print
statement as a second command produces the expected result.
print(result.j_table.isEmpty())
All table updates emanate from the Periodic Update Graph. An understanding of how the Update Graph works can greatly improve query writing.