Skip to main content

Testing machine learning models on simulated real-time data feeds

· 4 min read
DALL·E prompt: robot in classroom writing a test, digital art, synthwave
Jeremiah Cheng
JJ Brosnan
Leveraging Deephaven's TableReplayer to ensure model efficacy in real time

To maintain a competitive edge in the crypto space, you should be using AI to predict prices and manage investments. Doing so isn't as difficult as you might think.

Testing your models on real-time data doesn't have to be difficult - simulate a real-time feed in just a few lines of code.

This is the fourth of a six-part blog series on real-time crypto price predictions with AI. In this blog, I'll test the AI models I created in the previous two installments on a simulated real-time feed. Keep up with the rest of the blog series:

  1. Acquire up-to-date crypto data with Apache Airflow
  2. Implement an LSTM model with TensorFlow
  3. Implement a linear regression model with Nvidia RAPIDS
  4. Test the models on simulated real-time data
  5. Implement the models on real-time crypto data from Coinbase
  6. Share AI predictions with URIs

Replay historical data in real time

Testing code on simulated real-time data has applications that reach far and wide beyond machine learning. Whether you want to calculate financial indicators, find outliers, smooth data, or even just watch data tick, simulation is key to maximize efficacy.

Deephaven's TableReplayer allows you to simulate true real-time data by using a historical data set with timestamps. By supplying start and end times in the Deephaven format, you can replay the historical data you're familiar with and test your algorithms on it.

In the previous two blogs, I built and trained models in both TensorFlow and Nvidia RAPIDS. Both models used a training set called train_dh. I additionally created a test set, test_dh, but never used it. The test set contains BTC price data from two different date ranges. I finally get to use it in conjunction with TableReplayer to test my models on real-time data. This workflow is powerful for real-time AI applications, since you get to see if your models work on an actual real-time feed.

In the code below, I aptly call my table replayer replayer, and supply it a start and end time in Deephaven format. The add_table method tells it which table to replay, followed by the name of the column that contains date-times. From there, call start and the table ticks with the simulated real-time feed. The dates correspond to the testing set for the TensorFlow model.

from deephaven.replay import TableReplayer
import deephaven.time as dhtu

start_time = dhtu.to_datetime("2022-07-27T21:10:00 NY")
end_time = dhtu.to_datetime("2022-07-27T22:10:00 NY")

replayer = TableReplayer(start_time, end_time)
crypto_replayed = replayer.add_table(test_dh, "Date")
replayer.start()

Apply models in real time

TensorFlow LSTM model

The code for building the TensorFlow model can be found here. I'll be using some values found in the code in that blog, including n_input and n_features. I'll define how to apply the model to the data feed in a function.

from deephaven.replay import TableReplayer
from deephaven import numpy as dhnp
from deephaven.learn import gather
import deephaven.time as dhtu
from deephaven import learn

import numpy as np

start_time = dhtu.to_datetime("2022-07-27T21:10:00 NY")
end_time = dhtu.to_datetime("2022-07-27T22:10:00 NY")

replayer = TableReplayer(start_time, end_time)
crypto_replayed = replayer.add_table(test_dh, "Date")
replayer.start()

def table_to_numpy_double(rows, cols):
return gather.table_to_numpy_2d(rows, cols, np_type=np.double)

def get_predicted_price(data, idx):
return data

new_data = dhnp.to_numpy(train_dh.select(["Price"])).reshape(-1, 1)
first_eval_batch = new_data[-n_input:]
last_three = first_eval_batch.reshape((1, n_input, n_features))

def predict_with_model(data):
global last_three
# Get the prediction for the first batch
current_pred = model.predict(last_three)
current_pred=scaler.inverse_transform(current_pred)
current_pred=current_pred.reshape(1,-1)[0]
add_data = data[0]
value = ((add_data[0]- x_min)/(x_max - x_min))* (1 - (-1)) + (-1)
value=np.array([value])
# Update the batch and remove the first value
last_three = np.append(last_three[:,1:,:],[[value]],axis=1)

return current_pred

The final step is to use these functions and code to apply the model. I use deephaven.learn to apply the model to data in the replayed table.

lstm_in_realtime = learn.learn(
table=crypto_replayed,
model_func=predict_with_model,
inputs=[learn.Input("Price", table_to_numpy_double)],
outputs=[learn.Output("Predicted_price", get_predicted_class, "double")],
batch_size=1
)

img

Nvidia RAPIDS linear regression model

The same principles apply here. We need some code to define how our model gets applied to the real-time feed. That code will look very similar to what's in the previous section. Except this time, we'll apply it to a different data set of crypto prices on August 9, 2022.

from deephaven.replay import TableReplayer
from deephaven import time as dhtu
from deephaven.learn import gather
from deephaven import learn

import numpy as np

start_time = to_datetime("2022-08-09T19:18:00 NY")
end_time = to_datetime("2022-08-09T20:18:00 NY")

replayer = TableReplayer(start_time, end_time)
crypto_replayed = replayer.add_table(test_dh, "Date")
replayer.start()

def use_fitted_model(features):
return linear_regression_gpu.predict(features)

def table_to_numpy_double(rows, cols):
return gather.table_to_numpy_2d(rows, cols, np_type=np.double)

def get_predicted_price(data, idx):
return data

linreg_in_realtime = learn.learn(
table = crypto_replayed,
model_func = use_fitted_model,
inputs = [learn.Input(["Price_1","Price_2","Price_3"], table_to_numpy)],
outputs = [learn.Output("Predicted_Price", get_predicted_price, "double")],
batch_size = 1
)

img

I'm happy with the outputs of the models. Now that I've tested them on simulated real-time feeds, I'm ready for the big leagues. In the next blog, I'll use these models on true real-time data.