Skip to main content
Version: Python

Use TensorFlow in Deephaven

This guide will show you how to use TensorFlow in Deephaven queries.

TensorFlow is an open-source machine learning library for Python. It is one of the most well known and widely used artificial intelligence libraries available. It is supported by the Keras interface, and boasts a variety of features to support the building, training, validation, and deployment of a wide variety of machine learning models.

TensorFlow does not come stock with Deephaven's base Docker image. To use it within Deephaven, you can install it yourself or choose one of a Deephaven Docker deployments with support built-in. The following options will work:

The two examples below both solve the same problem of classifying the Iris dataset, which can be found in Deephaven's examples repository. The first example uses TensorFlow alone, whereas the second integrates Deephaven tables.

note

In this guide, we read data from Deephaven's examples repository. You can also load files that are in a mounted directory at the base of the Docker container. See Docker data volumes to learn more about the relation between locations in the container and the local file system.

The Iris flower dataset is a popular dataset commonly used in introductory machine learning applications. Created by R.A. Fisher in his 1936 paper, The use of multiple measurements in taxonomic problems, it contains 150 measurements of three types of Iris flower subspecies. The following values are measured in centimeters for Iris-setosa, Iris-virginica, and Iris-versicolor flowers:

  • Petal length
  • Petal width
  • Sepal length
  • Sepal width

This is a classification problem suitable for a supervised machine learning algorithm. We'll define and create a feed-forward neural network, train it, and test it. We'll determine its accuracy as a percentage based on the number of correct predictions compared to the known values.

Classify the Iris dataset

This first example shows how to use TensorFlow to classify Iris flowers from measurements.

Let's first import all the packages we'll need.

import pandas as pd
import random
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout

Next, we'll import our data, quantize the targets we want to predict, and split the data into training and testing sets.

# Read and quantize the dataset
iris = pd.read_csv(
"https://media.githubusercontent.com/media/deephaven/examples/main/Iris/csv/iris.csv"
)
iris_mappings = {"Iris-setosa": 0, "Iris-virginica": 1, "Iris-versicolor": 2}
iris["Class"] = iris["Class"].apply(lambda x: iris_mappings[x])

# Split the DataFrame into training and testing sets
iris_shuffled = iris.sample(frac=1)
train_size = int(0.75 * len(iris_shuffled))
train_set = iris_shuffled[:train_size]
test_set = iris_shuffled[train_size:]

# Separate our data into features and targets (X and Y)
X_train = train_set.drop("Class", axis=1).values
Y_train = train_set["Class"].values
X_test = test_set.drop("Class", axis=1).values
Y_test = test_set["Class"].values

With that done, we can define and create our neural network. We'll employ a simple feed-forward neural network with two hidden layers. Both hidden layers will use rectifier activation functions.

# Create the ANN
model = Sequential()
model.add(Dense(16, input_shape=(4,), activation=tf.nn.relu))
model.add(Dense(12, activation=tf.nn.relu))
model.add(Dense(3, activation=tf.nn.softmax))

Now comes the fun bit: training our network. We'll use sparse categorical cross entropy to calculate loss, and the Adam algorithm to optimize the network.

# Compile, fit, and evaluate the predictions of the ANN
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=["accuracy"],
)
model.fit(x=X_train, y=Y_train, epochs=100)
model.evaluate(X_test, Y_test)

This performance is pretty good for such a simple model.

Let's break down the code above into reusable steps that can be applied to a variety of supervised machine learning problems:

  • Import, quantize, update, and store data from an external source.
  • Define the machine learning model.
  • Set the loss calculation and optimization routines.
  • Train the model over a set number of training epochs.
  • Calculate the model's accuracy.

In this case, a simple neural network is suitable for a simple dataset.

Classify the Iris dataset with Deephaven tables

So we just classified the Iris dataset using a simple feed-forward neural network. That's kind of cool, but it would be cooler to classify a live feed of incoming data. Let's do that with Deephaven!

We extend our previous example to train our model on data in a Deephaven table. This requires some additional code.

First, we import everything we'll need.

# Deephaven imports
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
from deephaven.learn import gather
from deephaven import read_csv
from deephaven import learn

# Machine learning imports
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout

# Additional required imports
import numpy as np
import random, threading, time

Just as we did before, we next import our data. This time, we'll import it into a Deephaven table.

# Read and quantize the Iris dataset
iris_raw = read_csv(
"https://media.githubusercontent.com/media/deephaven/examples/main/Iris/csv/iris.csv"
)

classes = {}
num_classes = 0


def get_class_number(c) -> np.intc:
global classes, num_classes
if c not in classes:
classes[c] = num_classes
num_classes += 1
return classes[c]


iris = iris_raw.update(formulas=["Class = get_class_number(Class)"])

Next we define and create the neural network.

# Our neural network
model = Sequential()
model.add(Dense(16, input_shape=(4,), activation=tf.nn.relu))
model.add(Dense(12, activation=tf.nn.relu))
model.add(Dense(3, activation=tf.nn.softmax))

This time, we'll create a function to train the neural network.

# A function that trains the model
def train_model(X_train, Y_train):
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=["accuracy"],
)
model.fit(x=X_train, y=Y_train, epochs=100)


# A function that gets the model's predictions on input data
def predict_with_model(features):
if features.ndim == 1:
features = np.expand_dims(features, 0)
predictions = model.predict(features)
return [np.argmax(item) for item in predictions]

Now we need a few extra functions. The first two gather data from a Deephaven table into NumPy arrays of doubles and integers, respectively, while the last extracts a value from the predictions. The extracted values will be used to create a new column in the output table.

# A function to gather data from table columns into a NumPy array of doubles
def table_to_array_double(rows, cols):
return gather.table_to_numpy_2d(rows, cols, np_type=np.double)


# A function to gather data from table columns into a NumPy array of integers
def table_to_array_int(rows, cols):
return gather.table_to_numpy_2d(rows, cols, np_type=np.intc)


# A function to extract a list element at a given index
def get_predicted_class(data, idx):
return data[idx]

Now, we can predict the Iris subspecies from these measurements. This time, we'll do it in a Deephaven table using the learn function. The first time we call learn, we train the model. Then, we use the trained model to predict the values of the Iris subspecies in the Class column.

# Use the learn function to train our neural network
learn.learn(
table=iris,
model_func=train_model,
inputs=[
learn.Input(
["SepalLengthCM", "SepalWidthCM", "PetalLengthCM", "PetalWidthCM"],
table_to_array_double,
),
learn.Input(["Class"], table_to_array_int),
],
outputs=None,
batch_size=150,
)

# Use the learn function to create a new table that contains predicted values
iris_predicted_static = learn.learn(
table=iris,
model_func=predict_with_model,
inputs=[
learn.Input(
["SepalLengthCM", "SepalWidthCM", "PetalLengthCM", "PetalWidthCM"],
table_to_array_double,
)
],
outputs=[learn.Output("PredictedClass", get_predicted_class, "int")],
batch_size=150,
)

We've done the same thing as before: we created static predictions on static data. Only this time, we used a Deephaven table. That's not that exciting. We really want to perform the prediction stage on a live data feed. To demonstrate this, we'll create some fake Iris measurements.

We can create an in-memory real-time table using DynamicTableWriter. To create semi-realistic measurements, we'll use some known quantities from the Iris dataset:

ColumnMinimum (CM)Maximum (CM)
PetalLengthCM1.06.9
PetalWidthCM0.12.5
SepalLengthCM4.37.9
SepalWidthCM2.04.4

These quantities will be fed to our table writer and faux measurements will be written to the table once per second for one minute. We'll apply our model on those measurements as they arrive and predict which Iris subspecies they belong to.

First, we create a live table and write data to it.

# Create the table writer
table_writer = DynamicTableWriter(
{
"SepalLengthCM": dht.double,
"SepalWidthCM": dht.double,
"PetalLengthCM": dht.double,
"PetalWidthCM": dht.double,
}
)

# Get the live, ticking table
live_iris = table_writer.table


# This function creates faux Iris measurements once per second for a minute
def write_to_iris():
for i in range(60):
petal_length = random.randint(10, 69) / 10
petal_width = random.randint(1, 25) / 10
sepal_length = random.randint(43, 79) / 10
sepal_width = random.randint(20, 44) / 10

table_writer.write_row(sepal_length, sepal_width, petal_length, petal_width)
time.sleep(1)


# Use a thread to write data to the table
thread = threading.Thread(target=write_to_iris)
thread.start()

Now we use learn on the ticking table. All it takes to change from static to live data is to change the table!

# Use the learn function to create a new table with live predictions
iris_predicted_live = learn.learn(
table=live_iris,
model_func=predict_with_model,
inputs=[
learn.Input(
["SepalLengthCM", "SepalWidthCM", "PetalLengthCM", "PetalWidthCM"],
table_to_array_double,
)
],
outputs=[learn.Output("PredictedClass", get_predicted_class, "int")],
batch_size=150,
)

Once we have the code to predict static data written, this is all it takes to make predictions on live data.

# Create the table writer
table_writer = DynamicTableWriter(
{
"SepalLengthCM": dht.double,
"SepalWidthCM": dht.double,
"PetalLengthCM": dht.double,
"PetalWidthCM": dht.double,
}
)

# Get the live, ticking table
live_iris = table_writer.table


# This function creates faux Iris measurements once per second for a minute
def write_to_iris():
for i in range(60):
petal_length = random.randint(10, 69) / 10
petal_width = random.randint(1, 25) / 10
sepal_length = random.randint(43, 79) / 10
sepal_width = random.randint(20, 44) / 10

table_writer.write_row(sepal_length, sepal_width, petal_length, petal_width)
time.sleep(1)


# Use a thread to write data to the table
thread = threading.Thread(target=write_to_iris)
thread.start()

# Use the learn function to create a new table with live predictions
iris_predicted_live = learn.learn(
table=live_iris,
model_func=predict_with_model,
inputs=[
learn.Input(
["SepalLengthCM", "SepalWidthCM", "PetalLengthCM", "PetalWidthCM"],
table_to_array_double,
)
],
outputs=[learn.Output("PredictedClass", get_predicted_class, "int")],
batch_size=150,
)