Over the past year that I've been at Deephaven, we've developed many examples showing how to use Deephaven with web-based APIs through HTTP requests. Another very common way to receive data is through WebSockets. In this blog, I will show how to connect to Coinbase's websocket and pull that data into Deephaven.
Python's websocket-client
The websocket-client package can be used as a WebSocket client for Python. It's very easy to setup and use with Deephaven, as it just needs a simple pip install
command.
import os
os.system("pip install websocket-client")
Now we can use the websocket
package in Deephaven.
Coinbase WebSockets
The Coinbase WebSocket follows a subscribe and publish model. All this means is that when you connect to the WebSocket, you send some information which specifies what information to receive back from the connection. The Coinbase WebSocket expects to receive information on products and channels. Here's an example:
from websocket import create_connection, WebSocketConnectionClosedException
import json
ws = create_connection("wss://ws-feed.exchange.coinbase.com")
ws.send(
json.dumps(
{
"type": "subscribe",
"product_ids": ["BTC-USD"],
"channels": ["matches"],
}
)
)
Now we can receive data from the WebSocket regarding Bitcoin USD prices and matches. Since we want to keep a persistent connection, we can use a never-ending loop to continually read the data.
The code below is illustrative. It shows the usage pattern for pulling data in the subsequent section. It should not be run, as it will run forever.
while True:
try:
data = json.loads(ws.recv())
print(data)
except:
pass
DynamicTableWriter
Now that we have data coming in, we can use Deephaven's DynamicTableWriter to create a table containing the WebSocket data. In addition, we can convert some of the data to non-string types, so we'll do that as well:
from deephaven.time import to_datetime
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
from threading import Thread
def coinbase_time_to_datetime(strn):
return to_datetime(strn[0:-1] + " UTC")
dtw_column_converter = {
'size': float,
'price': float,
'time': coinbase_time_to_datetime
}
dtw_columns = {
'product_id': dht.string,
'time': dht.DateTime,
'side': dht.string,
'size': dht.float_,
'price': dht.float_,
'type': dht.string,
'trade_id': dht.int_,
'maker_order_id': dht.string,
'taker_order_id': dht.string,
'sequence': dht.int_,
}
dtw = DynamicTableWriter(dtw_columns)
def thread_function():
while True:
try:
data = json.loads(ws.recv())
row_to_write = []
for key in dtw_columns:
value = None
if key in dtw_column_converter:
value = dtw_column_converter[key](data[key])
else:
value = data[key]
row_to_write.append(value)
dtw.write_row(*row_to_write)
except Exception as e:
print(e)
thread = Thread(target=thread_function)
thread.start()
coinbase_websocket_table = dtw.table
Deriving more tables
With data coming in, next we can derive additional tables based on data in our real-time table. For this example, we'll create a table showing the average price and total trades by every 10 seconds:
from deephaven import agg
agg_list = [
agg.avg(cols=["avg_price = price"]),
agg.count_("trade_count")
]
summary_10s = coinbase_websocket_table.update(["time_10s = lowerBin(time, SECOND * 10)"]).agg_by(agg_list, by=["time_10s"])
Share your ideas
Websockets are a great way to collect real-time data.
One major advantage that WebSockets have over HTTP connections is that they are persistent; the data is pushed to you rather than polling.
The Deephaven Coinbase WebSocket example application shows a real-life example of collecting data from a WebSocket.
What WebSockets have you found that work well with real-time data? Reach out on Slack if you have any ideas you'd like to share.