AI is fundamental to modern strategies for deriving value from unstructured language. TensorFlow enables data scientists to create sophisticated, large-scale neural networks and is a staple for text-based applications like sentiment analysis. The value of these models increase if they're usable with real-time data.
Deephaven is a natural pairing for AI libraries like TensorFlow. It couples real-time and batch data processing in a single abstraction and provides infrastructure to support your TensorFlow build, train, test, and deploy cycles.
I implement a basic sentiment analysis within the Deephaven framework and deploy it on a (simulated) real-time feed of tokenized Twitter data. It analyzes tweets from a GOP debate from 2016, per a Kaggle project. For obvious reasons, I simulate the real-time data streaming via replay (Deephaven makes that easy), but the training and deployment processes could be applied to many of today's real-time use cases. Approval ratings and public opinion assessment are examples that come to mind.
I closely follow Peter Nagy's LSTM model. It has strong instructions and an honest assessment of strengths and weaknesses of LSTM. My code extends Mr. Nagy's method to provide a real-time implementation. It's available in the Sentiment folder in Deephaven's Examples repo.
The dataset
Kaggle's dataset provides sentiment.csv
containing 13,871 tweets. Each row of the file has 21 columns corresponding to who wrote the tweet, what the tweet was about, its sentiment, and other parameters. For this use case, we care about two columns only:
text
sentiment
Imports and data ingestion
Let's start by removing tweets which have neutral sentiment. We want to identify tweets that are positive or negative, since those show the respective Twitter user's feelings towards a subject. After this paring, we're left with 2 columns and 10,729 rows of tweet data.
from deephaven import read_csv
sentiment = read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/Sentiment/csv/sentiment.csv")
sentiment = sentiment.view("text", "sentiment").where("sentiment != `Neutral`")
In order to perform sentiment analysis, we need to turn the tweets into numeric data. We can do this using the Keras Tokenizer. We also need to prune the text itself by casting all of it to lower case letters and removing non-alphabetic characters. Lastly, to ensure the tokenized sequences are of the same length, we'll use the Keras pad_sequences method.
Since Deephaven can run Python natively, I can perform all of these tasks within the engine process.
from deephaven import pandas as dhpd
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
import pandas as pd
import numpy as np
import re
def update_text(text):
return re.sub("[^a-zA-z0-9\s]", "", text.lower()).replace("rt", " ")
sentiment = sentiment.update("text = update_text(text)")
df_sentiment = dhpd.to_dhpd(sentiment)
max_features = 2000
tokenizer = Tokenizer(num_words = max_features, split = " ")
tokenizer.fit_on_texts(df_sentiment["text"].values)
x = tokenizer.texts_to_sequences(df_sentiment["text"].values)
x = pad_sequences(x)
y = pd.get_dummies(df_sentiment["sentiment"]).values
data_tokenized = np.hstack((x, y))
x_column_names = ["Tokenized_" + str(i) for i in range(x.shape[1])]
y_column_names = ["Negative", "Positive"]
column_names = x_column_names + y_column_names
df_tokenized = pd.DataFrame(data_tokenized, columns = column_names)
tokenized_sentiment = dataFrameToTable(df_tokenized)
Our new table, tokenized_sentiment
, has 28 columns of tokenized text data, and two columns that indicate positive or negative sentiment about each tokenized tweet. This is the data we'll use to train and test the neural network. Speaking of which, it's time to create that model. We'll be using an LSTM network. LSTMs are particularly popular in time-series forecasting and speech/image recognition, but can be useful in sentiment analysis, too.
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Embedding, LSTM, SpatialDropout1D
embed_dim = 128
lstm_out = 196
max_features = 2000
sa_model = Sequential()
sa_model.add(Embedding(max_features, embed_dim, input_length = x.shape[1]))
sa_model.add(SpatialDropout1D(0.4))
sa_model.add(LSTM(lstm_out, dropout=0.2, recurrent_dropout=0.2))
sa_model.add(Dense(2,activation='softmax'))
sa_model.compile(loss = 'categorical_crossentropy', optimizer='adam',metrics = ['accuracy'])
print(sa_model.summary())
Split the data into training and testing sets
In Nagy's guide on Kaggle, he uses SciKit-Learn to split the data into training and testing sets using train_test_split
. This is probably the most popular way to split data into training and testing sets for AI applications in Python. However, I'm going to avoid using any packages outside of the Deephaven + TensorFlow installation. I'll split the data randomly using only NumPy methods.
split_pct = 67
def split_train_test(x, y, split_percentage):
random_seeds = np.random.rand(x.shape[0])
split = random_seeds < np.percentile(random_seeds, split_percentage)
return x[split], y[split], x[~split], y[~split]
x_train, y_train, x_test, y_test = split_train_test(x, y, split_pct)
train_table = dhpd.to_table(pd.DataFrame(np.hstack((x_train, y_train)), columns = column_names))
test_table = dhpd.to_table(pd.DataFrame(np.hstack((x_test, y_test)), columns = column_names))
print(f"Number of rows in training table: {train_table.intSize()}")
print(f"Number of rows in testing table: {test_table.intSize()}")
Define functions for data I/O between Python objects and Deephaven tables
When using deephaven.learn
, you are required to define how data is transferred to and from Deephaven tables and Python objects. In this case, we define two functions:
table_to_numpy_int
: This returns a two-dimensional NumPy ndarray of integer values taken from select rows and columns in a Deephaven table.numpy_to_table
: This returns a single value (this won't get used until the very end of the example).
Every time learn
is called, table_to_numpy_int
will be used. This function defines how data in a Deephaven table is transferred to a NumPy array, which we'll need in order to do anything with the sentiment analysis model. The second function, numpy_to_table
, won't get used until the trained model is used to make predictions.
from deephaven.learn import gather
from deephaven import learn
def table_to_numpy_int(rows, columns):
return gather.table_to_numpy_2d(rows, columns, dtype = np.intc)
def numpy_to_table(data, index):
return data
Train the model
It's time to train our model using the data in the testing table. Nagy chooses to train the network for 7 epochs for the sake of time. If you have the patience to wait a while, increase the number of training epochs to a larger value. Remember, in Deephaven, training a model will be done inside of a function.
def train_model(data):
global sa_model
x_train = data[:, :-2]
y_train = data[:, -2:]
sa_model.fit(x_train, y_train, epochs = 7, batch_size = 32, verbose = 2)
learn.learn(
table = train_table,
model_func = train_model,
inputs = [learn.Input(column_names, table_to_numpy_int)],
outputs = None,
batch_size = train_table.intSize()
)
Validate the model
Nagy does this in two steps. The first measures the score and accuracy of the model, and the second reports the percent of correct sentiment predictions. I'll combine this into one function.
import time
def report_fitted_model_performance(data):
global sa_model
x = data[:, :-2]
y = data[:, -2:]
score, acc = sa_model.evaluate(x, y, verbose = 0, batch_size = 32)
print("Model score: %.2f" % (score))
print("Model accuracy: %.2f" % (acc))
pos_cnt, neg_cnt, pos_correct, neg_correct = 0, 0, 0, 0
start = time.time()
for i in range(len(x)):
result = sa_model.predict(x[i].reshape(1, x.shape[1]), batch_size = 1, verbose = 0)[0]
if np.argmax(result) == np.argmax(y[i]):
if np.argmax(y[i]) == 0:
neg_correct += 1
else:
pos_correct += 1
if np.argmax(y[i]) == 0:
neg_cnt += 1
else:
pos_cnt += 1
end = time.time()
average_model_prediction_time = (end - start) / len(x)
print("Time per model prediction ~= %.2f" % (average_model_prediction_time))
print(f"Positive sentiment correct out of total: {pos_correct} out of {pos_cnt}")
print(f"Negative sentiment correct out of total: {neg_correct} out of {neg_cnt}")
learn.learn(
table = test_table,
model_func = report_fitted_model_performance,
inputs = [learn.Input(column_names, table_to_numpy_int)],
outputs = None,
batch_size = test_table.intSize()
)
This particular model does a great job of identifying negative sentiment, but a rather poor job on positive sentiment. This is likely due to the class imbalance, as around 80% of the tweets in the data (excluding neutral sentiment) have negative sentiment. Neural networks models such as this one tend to have difficulty in dealing with class imbalances.
Real-time sentiment analysis
Deephaven's strengths lie in its ability to handle numerous external data feeds as well as perform analysis on both real-time and static tables. So, let's use the trained model on some real-time data. The tweets in the dataset don't have timestamps attached to them, so we'll simulate the real-time feed by writing anywhere from 1 to 10 tweets to a table every second. Additionally, the simulated feed will write the tokenized text data rather than the tweets themselves.
def use_trained_model(data):
global sa_model
result = sa_model.predict(data, batch_size = 1, verbose = 0)[0]
return np.argmax(result)
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
import threading
table_writer = DynamicTableWriter(
x_column_names, [dht.int_] * x.shape[1]
)
simulated_tokenized_tweet_feed = table_writer.getTable()
def simulate_tokenized_tweet_feed(x_train):
for i in range(x_train.shape[0]):
random_sleep_time = np.random.uniform(0.1, 1)
x00, x01, x02, x03 = x[i][0].item(), x[i][1].item(), x[i][2].item(), x[i][3].item()
x04, x05, x06, x07 = x[i][4].item(), x[i][5].item(), x[i][6].item(), x[i][7].item()
x08, x09, x10, x11 = x[i][8].item(), x[i][9].item(), x[i][10].item(), x[i][11].item()
x12, x13, x14, x15 = x[i][12].item(), x[i][13].item(), x[i][14].item(), x[i][15].item()
x16, x17, x18, x19 = x[i][16].item(), x[i][17].item(), x[i][18].item(), x[i][19].item()
x20, x21, x22, x23 = x[i][20].item(), x[i][21].item(), x[i][22].item(), x[i][23].item()
x24, x25, x26, x27 = x[i][24].item(), x[i][25].item(), x[i][26].item(), x[i][27].item()
table_writer.logRow(\
x00, x01, x02, x03, x04, x05, x06, \
x07, x08, x09, x10, x11, x12, x13, \
x14, x15, x16, x17, x18, x19, x20, \
x21, x22, x23, x24, x25, x26, x27)
time.sleep(random_sleep_time)
thread = threading.Thread(target = simulate_tokenized_tweet_feed, args = (x_train,))
thread.start()
live_predicted_sentiment = learn.learn(
table = simulated_tokenized_tweet_feed,
model_func = use_trained_model,
inputs = [learn.Input(x_column_names, table_to_numpy_int)],
outputs = [learn.Output("Predicted_Sentiment", numpy_to_table, "int")],
batch_size = 1
)
Conclusion
This article demonstrates the creation, training, and deployment of a sentiment analysis model on both historical and real-time data. It uses Deephaven to extend standard Python AI to streaming tables. These are fundamental capabilities of the engine and speak to its interoperability.
Further reading
My friend, Jake Mulford, recently wrote a piece about Reddit Sentiment Analysis. It complements the efforts described above.
I used Deephaven with TensorFlow and examples as a base for the application in this article. Deephaven with TensorFlow provides a quickly-deployable Docker image to get started with TensorFlow in Deephaven. Alternatively, you can install TensorFlow and other packages manually.
I've previously covered AI/ML in Deephaven using SciKit-Learn doing Credit Card Fraud Detection and Diabetes Classification. I look forward to exploring (and sharing solutions for) more use cases at the intersection of real-time data and AI.