Skip to main content

Deephaven and asyncio for live crypto monitoring

· 5 min read
AI prompt: a large pile of gold bitcoins and a few solid plastic colorful cubes, isolated on a dark blue background
Devin Smith
JJ Brosnan
Create streaming tables to capture Coinbase data

Last year, our colleague Jake wrote a blog about using Deephaven and Python's websockets to subscribe to Coinbase's Websocket Feed. Here, we revisit Jake's example to bring it up to speed with Deephaven's latest API. Jake's query is an excellent use case for the recently released Table Publisher.

A Table Publisher allows users to programmatically add data to a blink table. It's a perfect candidate for Python's asyncio and websockets libraries to ingest crypto data in real-time from Coinbase.

With websockets, asyncio, and Deephaven's Table Publisher, you can create a reproducible end-to-end solution to ingest and analyze real-time crypto data from Coinbase's websocket feed.

Python and websockets

The websockets package is required to run the code in this blog. It's easy to set up and use with Deephaven. For this guide, we'll install it using os and pip.

import os

os.system("pip install websockets")

Deephaven's Table Publisher is a new, preferable option to the DynamicTableWriter for writing data to tables in real time. Where a DynamicTableWriter writes one row at a time to an append-only table, a Table Publisher adds entire tables to a blink table, which only keeps rows from the current update cycle. Rows from previous update cycles disappear from the UI and memory as soon as new data enters the table.

By itself, a blink table is a powerful tool to ingest data from external sources in real time without having to worry about memory constraints. The tradeoff is that a blink table doesn't store any history. However, it's easy to convert a blink table to either a ring table or an append-only table. Both keep data history, which is critical for some real-world applications.

Coinbase Websockets

Like Jake's blog, the subscribe-and-publish model is at the heart of the code used to ingest data from Coinbase's websocket feed. Below is what it looks like to make a request to the feed with Python.

from websocket import create_connection, WebSocketConnectionClosedException

import json

ws = create_connection("wss://ws-feed.exchange.coinbase.com")
ws.send(
json.dumps(
{
"type": "subscribe",
"product_ids": ["BTC-USD"],
"channels": ["matches"],
}
)
)

You could run this in a while True to ingest and print the data forever. We don't recommend doing so, but to illustrate:

while True:
try:
data = json.loads(ws.recv())
print(data)
except:
pass

Putting it all together

The entire application consists of three scripts, which should be run in the order given in this blog.

This first script sets up the methods to ingest data from the Coinbase websocket feed using asynchronous I/O.

import asyncio
import json
import websockets

from dataclasses import dataclass
from typing import Callable

COINBASE_WSFEED_URL = "wss://ws-feed.exchange.coinbase.com"


@dataclass
class Match:
type: str
trade_id: int
maker_order_id: str
taker_order_id: str
side: str
size: str
price: str
product_id: str
sequence: int
time: str


async def handle_matches(
product_ids: list[str], message_handler: Callable[[Match], None]
):
async for websocket in websockets.connect(COINBASE_WSFEED_URL):
await websocket.send(
json.dumps(
{
"type": "subscribe",
"product_ids": product_ids,
"channels": ["matches"],
}
)
)
# Skip subscribe response
await websocket.recv()
# Skip the last_match messages
for _ in product_ids:
await websocket.recv()
async for message in websocket:
message_handler(Match(**json.loads(message)))

This second script sets up the Table Publisher and its supported methods, which convert ingested data, enable clean shutdown, and create the table to which the data will be published.

from datetime import datetime
from typing import Callable
from concurrent.futures import CancelledError

from deephaven.table import Table
from deephaven.time import to_j_instant
from deephaven.column import string_col, double_col, datetime_col, long_col
from deephaven.dtypes import int64, string, double, Instant
from deephaven.table_factory import new_table
from deephaven.stream.table_publisher import table_publisher, TablePublisher


def to_table(matches: list[Match]):
return new_table(
[
datetime_col("Time", [to_j_instant(x.time) for x in matches]),
long_col("TradeId", [x.trade_id for x in matches]),
string_col("MakerOrderId", [x.maker_order_id for x in matches]),
string_col("TakerOrderId", [x.taker_order_id for x in matches]),
string_col("Side", [x.side for x in matches]),
double_col("Size", [float(x.size) for x in matches]),
double_col("Price", [float(x.price) for x in matches]),
string_col("ProductId", [x.product_id for x in matches]),
long_col("Sequence", [x.sequence for x in matches]),
]
)


def create_matches(
product_ids: list[str], event_loop
) -> tuple[Table, Callable[[], None]]:

on_shutdown_callbacks = []

def on_shutdown():
nonlocal on_shutdown_callbacks
for c in on_shutdown_callbacks:
c()

my_matches: list[Match] = []

def on_flush(tp: TablePublisher):
nonlocal my_matches
# We need to take a shallow copy to ensure we don't allow asyncio additions to
# my_matches while we are in java (where we drop the GIL)
my_matches_copy = my_matches.copy()
my_matches.clear()
tp.add(to_table(my_matches_copy))

table, publisher = table_publisher(
f"Matches for {product_ids}",
{
"Time": Instant,
"TradeId": int64,
"MakerOrderId": string,
"TakerOrderId": string,
"Side": string,
"Size": double,
"Price": double,
"ProductId": string,
"Sequence": int64,
},
on_flush_callback=on_flush,
on_shutdown_callback=on_shutdown,
)

future = asyncio.run_coroutine_threadsafe(
handle_matches(product_ids, my_matches.append), event_loop
)

def on_future_done(f):
nonlocal publisher
try:
e = f.exception(timeout=0) or RuntimeError("completed")
except CancelledError as c:
e = RuntimeError("cancelled")
publisher.publish_failure(e)

future.add_done_callback(on_future_done)

on_shutdown_callbacks.append(future.cancel)

return table, future.cancel

The third script creates an asynchronous event loop and runs the code to create the blink table. Downstream operations can convert it to table types that keep data history.

import asyncio
from threading import Thread

from deephaven.agg import count_, first, last, max_, min_, sum_, weighted_avg
from deephaven.plot import Figure
from deephaven.plot.plotstyle import PlotStyle

my_event_loop = asyncio.new_event_loop()
Thread(target=my_event_loop.run_forever).start()


def subscribe_stats(product_ids: list[str]):
def do_agg(t: Table) -> Table:
return (
t.update_view(["TimeBin=lowerBin(Time, 60000000000)", "Volume=Price*Size"])
.agg_by(
[
count_("Count"),
sum_(["TotalSize=Size", "TotalVolume=Volume"]),
weighted_avg("Size", ["WAvgPrice=Price"]),
first(["FirstPrice=Price"]),
last(["LastPrice=Price"]),
min_(["MinPrice=Price"]),
max_(["MaxPrice=Price"]),
],
by=["TimeBin", "ProductId", "Side"],
)
.sort(["TimeBin", "ProductId", "Side"])
)

def do_plot(t: Table):
return (
Figure()
.axes(plot_style=PlotStyle.BAR)
.plot_xy(
series_name="Volume",
t=t,
x="TimeBin",
y="TotalVolume",
by=["ProductId", "Side"],
)
.show()
)

blink_table, on_done = create_matches(product_ids, my_event_loop)
agg_table = do_agg(blink_table)
volume_plot = do_plot(agg_table)
return agg_table, volume_plot, on_done


t1, t1_volume_plot, t1_cancel = subscribe_stats(["BTC-USD"])
t2, t2_volume_plot, t2_cancel = subscribe_stats(["ETH-USD", "BTC-USDT", "ETH-USDT"])

# call these to explicitly cancel
# t1_cancel()
# t2_cancel()

img

Watch and see

Reach out

Have questions or comments? Reach out on Slack!