# How to use PyTorch in Deephaven

This guide will show you how to use PyTorch in Deephaven queries.

PyTorch is an open source machine learning library for use in Python. It is one of the most well known and widely used artificial intelligence libraries currently available. It boasts an easy-to-learn Pythonic API that supports a wide variety of applications and features. Given Deephaven's strength in real time and big data processing, it is natural to pair the two.

PyTorch does not come stock with Deephaven's base Docker image. To use it within Deephaven, you can install it yourself or choose one of a Deephaven Docker deployments with support built-in. The following options will work:

- Without example data:
- With example data:

The two examples below both solve the same problem of classifying the Iris dataset, which can be found in Deephaven's examples repository. The first example uses PyTorch alone, whereas the second integrates Deephaven tables to perform predictions on live data.

In this guide, we read data from Deephaven's examples repository. You can also load files that are in a mounted directory at the base of the Docker container. See Docker data volumes to learn more about the relation between locations in the container and the local file system.

The Iris flower dataset is a popular dataset commonly used in introductory machine learning applications. Created by R.A. Fisher in his 1936 paper, `The use of multiple measurements in taxonomic problems`

, it contains 150 measurements of three types of Iris flower subspecies. The following values are measured in centimeters for Iris-setosa, Iris-virginica, and Iris-versicolor flowers:

- Petal length
- Petal width
- Sepal length
- Sepal width

This is a classification problem suitable for a supervised machine learning algorithm. We'll define and create a feed-forward neural network, train it, and test it. We'll determine its accuracy as a percentage based on the number of correct predictions compared to the known values.

## Classify the Iris dataset

This first example shows how to use PyTorch to classify Iris flowers from measurements.

Let's first import all the packages we'll need.

`import pandas as pd`

import random

import torch

import torch.nn as nn

import torch.nn.functional as F

Next, we'll import our data, quantize the targets we want to predict, and split the data into training and testing sets.

`# Read and quantize the dataset`

iris = pd.read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/Iris/csv/iris.csv")

iris_mappings = {

"Iris-setosa" : 0,

"Iris-virginica" : 1,

"Iris-versicolor" : 2

}

iris["Class"] = iris["Class"].apply(lambda x: iris_mappings[x])

# Split the DataFrame into training and testing sets

iris_shuffled = iris.sample(frac=1)

train_size = int(0.75 * len(iris_shuffled))

train_set = iris_shuffled[:train_size]

test_set = iris_shuffled[train_size:]

# Separate our data into features and targets (X and Y)

X_train = train_set.drop("Class", axis=1).values

Y_train = train_set["Class"].values

X_test = test_set.drop("Class", axis=1).values

Y_test = test_set["Class"].values

# Convert our training and testing sets to torch tensors

X_train = torch.FloatTensor(X_train)

X_test = torch.FloatTensor(X_test)

Y_train = torch.LongTensor(Y_train)

Y_test = torch.LongTensor(Y_test)

With that done, we can define and create our neural network. We'll employ a simple feed-forward neural network with two hidden layers. Both hidden layers will use the ReLU activation function.

`# Our neural network class (simple feed-forward neural network)`

class IrisANN(nn.Module):

def __init__(self):

# Specify two hidden layers in the neural network

super().__init__()

self.fc1 = nn.Linear(in_features=4, out_features=16)

self.fc2 = nn.Linear(in_features=16, out_features=12)

self.output = nn.Linear(in_features=12, out_features=3)

def forward(self, x):

# Specify what activation functions are used at each layer

x = F.relu(self.fc1(x))

x = F.relu(self.fc2(x))

x = self.output(x)

return x

# Create an instance of our neural network

model = IrisANN()

Now comes the fun bit: training our network. We'll calculate loss using cross entropy, and we'll optimize the network with the Adam algorithm.

`# Set our loss computation and optimization algorithms`

criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# Train the network over 100 epochs

epochs = 100

loss_arr = []

for i in range(epochs):

# Run data through the network

Y_hat = model.forward(X_train)

loss = criterion(Y_hat, Y_train)

loss_arr.append(loss)

if i % 10 == 0:

print(f'Epoch: {i} Loss: {loss}')

# Optimize the network

optimizer.zero_grad()

loss.backward()

optimizer.step()

Lastly, we'll check how well our model worked.

`# Save the predictions to a list`

preds = []

with torch.no_grad():

for val in X_test:

Y_hat = model.forward(val)

preds.append(Y_hat.argmax().item())

# Calculate and display how well the network did

df = pd.DataFrame({'Y': Y_test, 'YHat': preds})

df['Correct'] = [1 if corr == pred else 0 for corr, pred in zip(df['Y'], df['YHat'])]

accuracy = df['Correct'].sum() / len(df) * 100

print("Neural Network accuracy: " + str(accuracy) + "%")

This performance is pretty good for such a simple model.

This example follows a basic formula for using a neural network to solve a supervised learning problem:

- Import, quantize, update, and store data from an external source.
- Define the machine learning model.
- Set the loss calculation and optimization routines.
- Train the model over a set number of training epochs.
- Calculate the model's accuracy.

In this case, a simple neural network is suitable for a simple dataset.

## Classify the Iris dataset with Deephaven tables

So we just classified the Iris dataset using a simple feed-forward neural network. That's kind of cool, but it would be cooler to classify a live feed of incoming data. Let's do that with Deephaven!

First, we extend our previous example to train our model on data in a Deephaven table. This requires some additional code.

First, we import everything we need.

`# Deephaven imports`

from deephaven import DynamicTableWriter

import deephaven.dtypes as dht

from deephaven.learn import gather

from deephaven import read_csv

from deephaven import learn

# Machine learning imports

import torch

import torch.nn as nn

import torch.nn.functional as F

# Python imports

import numpy as np, random, threading, time

Just as we did before, we next import our data. This time, we'll import it into a Deephaven table.

`# Read and quantize the Iris dataset`

iris_raw = read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/Iris/csv/iris.csv")

classes = {}

num_classes = 0

def get_class_number(c) -> np.intc:

global classes, num_classes

if c not in classes:

classes[c] = num_classes

num_classes += 1

return classes[c]

iris = iris_raw.update(formulas=["Class = get_class_number(Class)"])

Next we define and create the neural network.

`# Our neural network class`

class IrisANN(nn.Module):

def __init__(self):

super().__init__()

self.fc1 = nn.Linear(in_features=4, out_features=16)

self.fc2 = nn.Linear(in_features=16, out_features=12)

self.output = nn.Linear(in_features=12, out_features=3)

def forward(self, x):

x = x.float()

x = F.relu(self.fc1(x))

x = F.relu(self.fc2(x))

x = self.output(x)

return x

# Create the neural network

model = IrisANN()

This time, we'll create a function to train the neural network.

`# A function that trains the model`

def train_model(X_train, Y_train):

global model

# Set training parameters

criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

epochs = 100

loss_arr = []

for i in range(epochs):

Y_hat = model.forward(X_train)

loss = criterion(Y_hat, Y_train.long())

loss_arr.append(loss)

if i % 10 == 0:

print(f'Epoch: {i} Loss: {loss}')

optimizer.zero_grad()

loss.backward()

optimizer.step()

# A function that gets the model's predictions from input data

def predict_with_model(features):

if features.dim() == 1:

features = torch.unsqueeze(features, 0)

preds = []

with torch.no_grad():

for val in features:

Y_hat = model.forward(val)

preds.append(Y_hat.argmax().item())

# Return the model's predictions

return preds

Now we need some extra functions. The first two gather data from a Deephaven table into torch tensors of doubles and integers, respectively, while the third extracts a value from the predictions. The extracted values will be used to create a new column in the output table.

`# A function to gather data from table columns into a torch tensor of doubles`

def table_to_tensor_double(rows, cols):

return torch.from_numpy(gather.table_to_numpy_2d(rows, cols, np_type=np.double))

# A function to gather data from table columns into a torch tensor of integers

def table_to_tensor_int(rows, cols):

return torch.from_numpy(np.squeeze(gather.table_to_numpy_2d(rows, cols, np_type=np.intc)))

# A function to extract a prediction and cast the value to an integer

def get_predicted_class(data, idx):

return int(data[idx])

Now, we can predict the Iris subspecies from these measurements. This time, we'll do it in a Deephaven table using the `learn`

function. The first time we call `learn`

, we train the model. Then, we use the trained model to predict the values of the Iris subspecies in the `Class`

column.

`# Use the learn function to train our neural network`

learn.learn(

table=iris,

model_func=train_model,

inputs=[learn.Input(["SepalLengthCM", "SepalWidthCM", "PetalLengthCM", "PetalWidthCM"], table_to_tensor_double), learn.Input("Class", table_to_tensor_int)],

outputs=None,

batch_size=150

)

# Use the learn function to create a new table that contains predicted values

iris_predicted_static = learn.learn(

table=iris,

model_func=predict_with_model,

inputs=[learn.Input(["SepalLengthCM", "SepalWidthCM", "PetalLengthCM", "PetalWidthCM"], table_to_tensor_double)],

outputs=[learn.Output("PredictedClass", get_predicted_class, "int")],

batch_size=150

)

We've done the same thing as before: we created static predictions on static data. Only this time, we used a Deephaven table. That's not that exciting. We really want to perform the prediction stage on a *live* data feed. To demonstrate this, we'll create some fake Iris measurements.

We can create an in-memory real-time table using DynamicTableWriter. To create semi-realistic measurements, we'll use some known quantities from the Iris dataset:

Column | Minimum (CM) | Maximum (CM) |
---|---|---|

`PetalLengthCM` | 1.0 | 6.9 |

`PetalWidthCM` | 0.1 | 2.5 |

`SepalLengthCM` | 4.3 | 7.9 |

`SepalWidthCM` | 2.0 | 4.4 |

These quantities will be fed to our table writer and faux measurements will be written to the table once per second for 30 seconds. We'll apply our model on those measurements as they arrive and predict which Iris subspecies they belong to.

First, we create a live table and write data to it.

`# Create the table writer`

table_writer = DynamicTableWriter(

{"SepalLengthCM": dht.double, "SepalWidthCM": dht.double, "PetalLengthCM": dht.double, "PetalWidthCM": dht.double}

)

# Get the live, ticking table

live_iris = table_writer.table

# This function creates faux Iris measurements once per second for a minute

def write_to_iris():

for i in range(30):

petal_length = random.randint(10, 69) / 10

petal_width = random.randint(1, 25) / 10

sepal_length = random.randint(43, 79) / 10

sepal_width = random.randint(20, 44) / 10

table_writer.write_row(sepal_length, sepal_width, petal_length, petal_width)

time.sleep(1)

# Use a thread to write data to the table

thread = threading.Thread(target=write_to_iris)

thread.start()

Now we use `learn`

on the ticking table. All it takes to change from static to live data is to change the table!

`# Use the learn function to create a new table with live predictions`

iris_predicted_live = learn.learn(

table=live_iris,

model_func=predict_with_model,

inputs=[learn.Input(["SepalLengthCM", "SepalWidthCM", "PetalLengthCM", "PetalWidthCM"], table_to_tensor_double)],

outputs=[learn.Output("PredictedClass", get_predicted_class, "int")],

batch_size=150

)

All of the code to create and use our trained model on the live, ticking table is below.

`# Create the table writer`

table_writer = DynamicTableWriter(

{"SepalLengthCM": dht.double, "SepalWidthCM": dht.double, "PetalLengthCM": dht.double, "PetalWidthCM": dht.double}

)

# Get the live, ticking table

live_iris = table_writer.table

# This function creates faux Iris measurements once per second for a minute

def write_to_iris():

for i in range(30):

petal_length = random.randint(10, 69) / 10

petal_width = random.randint(1, 25) / 10

sepal_length = random.randint(43, 79) / 10

sepal_width = random.randint(20, 44) / 10

table_writer.write_row(sepal_length, sepal_width, petal_length, petal_width)

time.sleep(1)

# Use a thread to write data to the table

thread = threading.Thread(target=write_to_iris)

thread.start()

# Use the learn function to create a new table with live predictions

iris_predicted_live = learn.learn(

table=live_iris,

model_func=predict_with_model,

inputs=[learn.Input(["SepalLengthCM", "SepalWidthCM", "PetalLengthCM", "PetalWidthCM"], table_to_tensor_double)],

outputs=[learn.Output("PredictedClass", get_predicted_class, "int")],

batch_size=150

)