Last year, our colleague Jake wrote a blog about using Deephaven and Python's websockets to subscribe to Coinbase's Websocket Feed. Here, we revisit Jake's example to bring it up to speed with Deephaven's latest API. Jake's query is an excellent use case for the recently released Table Publisher.
A Table Publisher allows users to programmatically add data to a blink table. It's a perfect candidate for Python's asyncio and websockets libraries to ingest crypto data in real-time from Coinbase.
Python and websockets
The websockets package is required to run the code in this blog. It's easy to set up and use with Deephaven. For this guide, we'll install it using os and pip.
import os
os.system("pip install websockets")
Table Publisher and blink tables
Deephaven's Table Publisher is a new, preferable option to the DynamicTableWriter for writing data to tables in real time. Where a DynamicTableWriter writes one row at a time to an append-only table, a Table Publisher adds entire tables to a blink table, which only keeps rows from the current update cycle. Rows from previous update cycles disappear from the UI and memory as soon as new data enters the table.
By itself, a blink table is a powerful tool to ingest data from external sources in real time without having to worry about memory constraints. The tradeoff is that a blink table doesn't store any history. However, it's easy to convert a blink table to either a ring table or an append-only table. Both keep data history, which is critical for some real-world applications.
Coinbase Websockets
Like Jake's blog, the subscribe-and-publish model is at the heart of the code used to ingest data from Coinbase's websocket feed. Below is what it looks like to make a request to the feed with Python.
from websocket import create_connection, WebSocketConnectionClosedException
import json
ws = create_connection("wss://ws-feed.exchange.coinbase.com")
ws.send(
json.dumps(
{
"type": "subscribe",
"product_ids": ["BTC-USD"],
"channels": ["matches"],
}
)
)
You could run this in a while True
to ingest and print the data forever. We don't recommend doing so, but to illustrate:
while True:
try:
data = json.loads(ws.recv())
print(data)
except:
pass
Putting it all together
The entire application consists of three scripts, which should be run in the order given in this blog.
This first script sets up the methods to ingest data from the Coinbase websocket feed using asynchronous I/O.
import asyncio
import json
import websockets
from dataclasses import dataclass
from typing import Callable
COINBASE_WSFEED_URL = "wss://ws-feed.exchange.coinbase.com"
@dataclass
class Match:
type: str
trade_id: int
maker_order_id: str
taker_order_id: str
side: str
size: str
price: str
product_id: str
sequence: int
time: str
async def handle_matches(
product_ids: list[str], message_handler: Callable[[Match], None]
):
async for websocket in websockets.connect(COINBASE_WSFEED_URL):
await websocket.send(
json.dumps(
{
"type": "subscribe",
"product_ids": product_ids,
"channels": ["matches"],
}
)
)
# Skip subscribe response
await websocket.recv()
# Skip the last_match messages
for _ in product_ids:
await websocket.recv()
async for message in websocket:
message_handler(Match(**json.loads(message)))
This second script sets up the Table Publisher and its supported methods, which convert ingested data, enable clean shutdown, and create the table to which the data will be published.
from datetime import datetime
from typing import Callable
from concurrent.futures import CancelledError
from deephaven.table import Table
from deephaven.time import to_j_instant
from deephaven.column import string_col, double_col, datetime_col, long_col
from deephaven.dtypes import int64, string, double, Instant
from deephaven.table_factory import new_table
from deephaven.stream.table_publisher import table_publisher, TablePublisher
def to_table(matches: list[Match]):
return new_table(
[
datetime_col("Time", [to_j_instant(x.time) for x in matches]),
long_col("TradeId", [x.trade_id for x in matches]),
string_col("MakerOrderId", [x.maker_order_id for x in matches]),
string_col("TakerOrderId", [x.taker_order_id for x in matches]),
string_col("Side", [x.side for x in matches]),
double_col("Size", [float(x.size) for x in matches]),
double_col("Price", [float(x.price) for x in matches]),
string_col("ProductId", [x.product_id for x in matches]),
long_col("Sequence", [x.sequence for x in matches]),
]
)
def create_matches(
product_ids: list[str], event_loop
) -> tuple[Table, Callable[[], None]]:
on_shutdown_callbacks = []
def on_shutdown():
nonlocal on_shutdown_callbacks
for c in on_shutdown_callbacks:
c()
my_matches: list[Match] = []
def on_flush(tp: TablePublisher):
nonlocal my_matches
# We need to take a shallow copy to ensure we don't allow asyncio additions to
# my_matches while we are in java (where we drop the GIL)
my_matches_copy = my_matches.copy()
my_matches.clear()
tp.add(to_table(my_matches_copy))
table, publisher = table_publisher(
f"Matches for {product_ids}",
{
"Time": Instant,
"TradeId": int64,
"MakerOrderId": string,
"TakerOrderId": string,
"Side": string,
"Size": double,
"Price": double,
"ProductId": string,
"Sequence": int64,
},
on_flush_callback=on_flush,
on_shutdown_callback=on_shutdown,
)
future = asyncio.run_coroutine_threadsafe(
handle_matches(product_ids, my_matches.append), event_loop
)
def on_future_done(f):
nonlocal publisher
try:
e = f.exception(timeout=0) or RuntimeError("completed")
except CancelledError as c:
e = RuntimeError("cancelled")
publisher.publish_failure(e)
future.add_done_callback(on_future_done)
on_shutdown_callbacks.append(future.cancel)
return table, future.cancel
The third script creates an asynchronous event loop and runs the code to create the blink table. Downstream operations can convert it to table types that keep data history.
import asyncio
from threading import Thread
from deephaven.agg import count_, first, last, max_, min_, sum_, weighted_avg
from deephaven.plot import Figure
from deephaven.plot.plotstyle import PlotStyle
my_event_loop = asyncio.new_event_loop()
Thread(target=my_event_loop.run_forever).start()
def subscribe_stats(product_ids: list[str]):
def do_agg(t: Table) -> Table:
return (
t.update_view(["TimeBin=lowerBin(Time, 60000000000)", "Volume=Price*Size"])
.agg_by(
[
count_("Count"),
sum_(["TotalSize=Size", "TotalVolume=Volume"]),
weighted_avg("Size", ["WAvgPrice=Price"]),
first(["FirstPrice=Price"]),
last(["LastPrice=Price"]),
min_(["MinPrice=Price"]),
max_(["MaxPrice=Price"]),
],
by=["TimeBin", "ProductId", "Side"],
)
.sort(["TimeBin", "ProductId", "Side"])
)
def do_plot(t: Table):
return (
Figure()
.axes(plot_style=PlotStyle.BAR)
.plot_xy(
series_name="Volume",
t=t,
x="TimeBin",
y="TotalVolume",
by=["ProductId", "Side"],
)
.show()
)
blink_table, on_done = create_matches(product_ids, my_event_loop)
agg_table = do_agg(blink_table)
volume_plot = do_plot(agg_table)
return agg_table, volume_plot, on_done
t1, t1_volume_plot, t1_cancel = subscribe_stats(["BTC-USD"])
t2, t2_volume_plot, t2_cancel = subscribe_stats(["ETH-USD", "BTC-USDT", "ETH-USDT"])
# call these to explicitly cancel
# t1_cancel()
# t2_cancel()
Watch and see
Reach out
Have questions or comments? Reach out on Slack!