Skip to main content

Keep it simple: WebSockets and real-time queries

· 3 min read
DALL·E prompt: a gold pipe coming out of the wall in a blue room with bitcoins pouring out of it, digital art 3d render
Jake Mulford
Connecting to Coinbase with Deephaven

Over the past year that I've been at Deephaven, we've developed many examples showing how to use Deephaven with web-based APIs through HTTP requests. Another very common way to receive data is through WebSockets. In this blog, I will show how to connect to Coinbase's websocket and pull that data into Deephaven.

Python's websocket-client

The websocket-client package can be used as a WebSocket client for Python. It's very easy to setup and use with Deephaven, as it just needs a simple pip install command.

import os

os.system("pip install websocket-client")

Now we can use the websocket package in Deephaven.

Coinbase WebSockets

The Coinbase WebSocket follows a subscribe and publish model. All this means is that when you connect to the WebSocket, you send some information which specifies what information to receive back from the connection. The Coinbase WebSocket expects to receive information on products and channels. Here's an example:

from websocket import create_connection, WebSocketConnectionClosedException

import json

ws = create_connection("wss://ws-feed.exchange.coinbase.com")
ws.send(
json.dumps(
{
"type": "subscribe",
"product_ids": ["BTC-USD"],
"channels": ["matches"],
}
)
)

Now we can receive data from the WebSocket regarding Bitcoin USD prices and matches. Since we want to keep a persistent connection, we can use a never-ending loop to continually read the data.

note

The code below is illustrative. It shows the usage pattern for pulling data in the subsequent section. It should not be run, as it will run forever.

while True:
try:
data = json.loads(ws.recv())
print(data)
except:
pass

DynamicTableWriter

Now that we have data coming in, we can use Deephaven's DynamicTableWriter to create a table containing the WebSocket data. In addition, we can convert some of the data to non-string types, so we'll do that as well:

from deephaven.time import to_datetime
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht

from threading import Thread

def coinbase_time_to_datetime(strn):
return to_datetime(strn[0:-1] + " UTC")

dtw_column_converter = {
'size': float,
'price': float,
'time': coinbase_time_to_datetime
}

dtw_columns = {
'product_id': dht.string,
'time': dht.DateTime,
'side': dht.string,
'size': dht.float_,
'price': dht.float_,
'type': dht.string,
'trade_id': dht.int_,
'maker_order_id': dht.string,
'taker_order_id': dht.string,
'sequence': dht.int_,
}

dtw = DynamicTableWriter(dtw_columns)

def thread_function():
while True:
try:
data = json.loads(ws.recv())
row_to_write = []
for key in dtw_columns:
value = None
if key in dtw_column_converter:
value = dtw_column_converter[key](data[key])
else:
value = data[key]
row_to_write.append(value)

dtw.write_row(*row_to_write)
except Exception as e:
print(e)

thread = Thread(target=thread_function)
thread.start()

coinbase_websocket_table = dtw.table

Deriving more tables

With data coming in, next we can derive additional tables based on data in our real-time table. For this example, we'll create a table showing the average price and total trades by every 10 seconds:

from deephaven import agg

agg_list = [
agg.avg(cols=["avg_price = price"]),
agg.count_("trade_count")
]


summary_10s = coinbase_websocket_table.update(["time_10s = lowerBin(time, SECOND * 10)"]).agg_by(agg_list, by=["time_10s"])

Share your ideas

Websockets are a great way to collect real-time data.

One major advantage that WebSockets have over HTTP connections is that they are persistent; the data is pushed to you rather than polling.

The Deephaven Coinbase WebSocket example application shows a real-life example of collecting data from a WebSocket.

What WebSockets have you found that work well with real-time data? Reach out on Slack if you have any ideas you'd like to share.