Large language models like ChatGPT have taken the world by storm. There is no question that they have already changed the face of technology, and will continue to improve every year. Humans still make decisions based on information found on the internet, and a growing pool of AI-generated pseudo-information and misinformation is making it more difficult to distinguish fact from fiction. It's not unreasonable to speculate that in the near future, the only things capable of detecting whether a piece of content was AI-generated will be other AI systems.
Building and deploying such systems is not an easy task. They need tons of domain-specific training data, of both human and AI origins. They need sophisticated models for distinguishing AI content from human content in a variety of contexts. They need to be deployed at large scales in real time, so generated content gets flagged immediately. This is a tall order, but it's not impossible. In fact, it's necessary.
In this post, I will sketch out what such a system might look like using a real example close to everyone's heart: Amazon reviews. No one wants to make a purchase on Amazon based on glowing 5-star reviews if those reviews aren't written by real people who really own the product. We have some tools at our disposal to address this problem.
- Data: We can use a massive dataset of Amazon reviews (over 571 million!) collected by Julian McAuley's lab. This dataset covers reviews from 1996 to 2023.
- Detection Method: A recent Kaggle competition focused on detecting AI-generated text. One of the top entries used a method called BERT(Bidirectional Encoder Representations from Transformers) to identify fake reviews. The pre-trained model is stored at
detector/detector.pt
. - Real-time Analysis: Deephaven Data Labs offers a powerful real-time data processing tool. This allows us to analyze reviews as they're written, not just after the fact.
To follow along, clone this project's GitHub repository and follow the README. Let's get started!
The code in this article is written for Deephaven 0.36.1. More specific versioning requirements can be found in the requirements.txt
file for this project.
Download data
Before getting started with this example, you'll need to download the Amazon data. This only needs to be done once, and can take some time depending on the speed of your internet connection and the number of processors available to you. This took about 20 minutes on a Macbook Pro M2 with 8 cores.
First, import the libraries needed to load the data.
import os
import datasets
import datetime as dt
datasets.logging.set_verbosity_error()
Next, set NUM_PROC
equal to the number of processors on your machine. This step is important, as it significantly impacts the download speed.
NUM_PROC = 8
Now, create a list of the dataset names for downloading, and define a start time to avoid downloading all the data. If you want to work with less data, leave some categories out of the list, or increase the start time.
# names of all dataset categories
names = [
'All_Beauty',
'Toys_and_Games',
'Cell_Phones_and_Accessories',
'Industrial_and_Scientific',
'Gift_Cards',
'Musical_Instruments',
'Electronics',
'Handmade_Products',
'Arts_Crafts_and_Sewing',
'Baby_Products',
'Health_and_Household',
'Office_Products',
'Digital_Music',
'Grocery_and_Gourmet_Food',
'Sports_and_Outdoors',
'Home_and_Kitchen',
'Subscription_Boxes',
'Tools_and_Home_Improvement',
'Pet_Supplies',
'Video_Games',
'Kindle_Store',
'Clothing_Shoes_and_Jewelry',
'Patio_Lawn_and_Garden',
'Unknown',
'Books',
'Automotive',
'CDs_and_Vinyl',
'Beauty_and_Personal_Care',
'Amazon_Fashion',
'Magazine_Subscriptions',
'Software',
'Health_and_Personal_Care',
'Appliances',
'Movies_and_TV'
]
# only want to look at data in 2023 - get Jan 1 2023 and convert to millis since epoch
start_time_millis = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc).timestamp() * 1_000
Finally, download and filter the data, and write the results to /amazon-data/
.
# download relevant portions of each dataset and write to local parquet
for name in names:
# download and filter review data if it does not exist
if not os.path.exists(f"amazon-data/reviews/{name}.parquet"):
print(f"Writing {name} review data...")
# load review data
review_dataset = datasets.load_dataset(
"McAuley-Lab/Amazon-Reviews-2023",
f"raw_review_{name}",
num_proc=NUM_PROC,
trust_remote_code=True)['full']
# select columns of interest and filter for post-2023 before writing
filtered_review_dataset = (
review_dataset
.select_columns(["rating", "title", "text", "parent_asin", "user_id", "timestamp"])
.filter(lambda timestamp: timestamp >= start_time_millis, input_columns="timestamp")
)
filtered_review_dataset.to_parquet(f"amazon-data/reviews/{name}.parquet")
if not os.path.exists(f"amazon-data/items/{name}.parquet"):
print(f"Writing {name} item data...")
# load item metadata
meta_dataset = datasets.load_dataset(
"McAuley-Lab/Amazon-Reviews-2023",
f"raw_meta_{name}",
num_proc=NUM_PROC,
trust_remote_code=True)['full']
# select columns of interest before writing
filtered_meta_dataset = (
meta_dataset
.select_columns(["main_category", "title", "average_rating", "rating_number", "parent_asin"])
)
filtered_meta_dataset.to_parquet(f"amazon-data/items/{name}.parquet")
Now that we have the data on disk, we can use it to simulate a real-time data stream!
Streaming Reviews
First, we want to stream the Amazon review dataset in real time. The Amazon dataset is static, so we will use TableReplayer
to simulate a real-time review stream. If you have a real-time review stream in a format like Kafka, you can directly use the stream without needing to simulate it.
Start by importing the necessary libraries.
from math import floor
from deephaven import parquet, dtypes
from deephaven.table import TableDefinition
from deephaven.replay import TableReplayer
from deephaven.time import to_j_instant
Now, read the Amazon reviews into a Parquet table with Deephaven's Parquet module.
# create table definition for review datasets
reviews_def = TableDefinition({
"rating": dtypes.double,
"title": dtypes.string,
"text": dtypes.string,
"parent_asin": dtypes.string,
"user_id": dtypes.string,
"timestamp": dtypes.long
})
# read reviews into a single table
reviews = parquet.read(
"/amazon-data/reviews/",
file_layout=parquet.ParquetFileLayout.FLAT_PARTITIONED,
table_definition=reviews_def
)
# convert timestamp to date-time timestamps
reviews = (
reviews
.update("timestamp = epochMillisToInstant(timestamp)")
.sort("timestamp")
)
The reviews
table has 25.6 million observations spanning 9 months. Streaming through all of those observations in real time would take... 9 months. Instead, we randomly sample 1 in 10,000 reviews and replay that data at 10,000x speed. This emulates Amazon's real-world review frequency and lets us visualize long-term trends in just a few minutes.
# minimum time from filtered table - faster to use UI than to compute with a query
min_time = to_j_instant("2023-01-01T00:00:00.000Z")
# create replay start time and end time
replay_start_time = to_j_instant("2024-01-01T00:00:00Z")
replay_end_time = to_j_instant("2024-01-01T00:36:00Z")
# replay data at 10,000x speed
data_speed = 10_000
# randomly sample data and create a timestamp that increments at 10,000x original speed
reviews = (
reviews
.where("random() < 1 / data_speed")
.update("replay_timestamp = replay_start_time + (long)floor((timestamp - min_time) / data_speed)")
)
Now, replay the data with Deephaven's TableReplayer
.
# create table replayer and start replay
reviews_replayer = TableReplayer(replay_start_time, replay_end_time)
reviews_ticking = reviews_replayer.add_table(reviews, "replay_timestamp").drop_columns("replay_timestamp")
reviews_replayer.start()
Live detection with BERT
With data flowing in simulation, it's possible to focus on the real-time detection of AI bots using BERT, our neural network that's been trained on ChatGPT-generated text. It's easier than you'd expect.
First, load the necessary libraries.
import concurrent.futures
import logging
import torch
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification
from deephaven.table_listener import listen, TableUpdate
from deephaven.stream.table_publisher import table_publisher
from deephaven.stream import blink_to_append_only
from deephaven import new_table
import deephaven.column as dhcol
import deephaven.dtypes as dtypes
Next, import the trained model's parameters into a new model object and load the tokenizer needed to transform the input data.
# suppress transformer parameter name warnings
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
for logger in loggers:
if "transformers" in logger.name.lower():
logger.setLevel(logging.ERROR)
# instantiate model and load parameters
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
model.load_state_dict(torch.load("/detector/detector.pt", weights_only=False))
# get device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# instantiate tokenizer
tokenizer = BertTokenizer.from_pretrained(
'bert-base-uncased',
do_lower_case=True,
padding=True,
truncation=True,
max_length=128,
clean_up_tokenization_spaces=True
)
Now, we're going to walk through a real-time AI workflow step-by-step. The workflow looks like this:
- Create an object called a
TablePublisher
to publish new data to a ticking table. This table,preds_blink
, will contain the new predictions. - Define a function to perform inference and publish the results to
preds_blink
. - Create a
TableListener
that will listen to the ticking data source and call the inference/publisher function as new data rolls in. - Tie it all together by listening to the ticking source, performing inference on new inputs, and publishing the results to a new table.
First, create the TablePublisher
using the table_publisher
function. This function returns an empty table to capture the published data, which we'll call preds_blink
, and an object that publishes data to that table, which we'll call preds_publish
. preds_blink
is a blink table, meaning that it will only hold the most recent data from a given update cycle. Check out the guide on table publishers to learn more.
# create table publisher, and blink table that data will be published to
preds_blink, preds_publish = table_publisher(
"DetectorOutput", {
"rating": dtypes.double,
"parent_asin": dtypes.string,
"user_id": dtypes.string,
"timestamp": dtypes.Instant,
"gen_prob": dtypes.float32
},
)
Next, define a function to perform the inference and publish the results to a new table using the table publisher defined previously. This function will be called every time more data rolls in, enabling Deephaven to perform real-time inference on only the most recent data. For simplicity, we've broken this into two functions: one to actually perform the inference on a given set of inputs and one to call that function and publish the results to a new table.
# function that determines if a review was generated by a bot
def detect_bot(text):
# tokenize text
tokenized_text = tokenizer(text.tolist(), padding=True, truncation=True, return_tensors='pt')
# move input tensor to the same device as the model
tokenized_text = {key: value.to(device) for key, value in tokenized_text.items()}
# generate predictions using trained model
with torch.no_grad():
outputs = model(**tokenized_text)
logits = outputs.logits
# the first column of logits corresponds to the negative class (non-AI-generated)
# and the second column corresponds to the positive class (AI-generated)
predictions = torch.softmax(logits, dim=1)[:, 1].cpu().numpy()
return predictions
# function to perform inference and publish the results to preds_blink
def compute_and_publish_inference(inputs, features):
# get outputs from AI model
outputs = detect_bot(inputs)
# create new table with relevant features and outputs
output_table = new_table(
[
dhcol.double_col("rating", features["rating"]),
dhcol.string_col("parent_asin", features["parent_asin"]),
dhcol.string_col("user_id", features["user_id"]),
dhcol.datetime_col("timestamp", features["timestamp"]),
dhcol.float_col("gen_prob", outputs)
]
)
# publish inference to preds_blink
preds_publish.add(output_table)
return None
Next, we create a TableListener
that listens to the ticking source and calls compute_and_publish
on new data. To do this, define a function called on_update
that takes two arguments, update
and is_replay
. Extract the added and modified data from the update
argument using update.added()
and update.modified()
. See the guide on table listeners to learn more.
Finally, we know that calling compute_and_publish
will be expensive - neural network inference is not cheap. Instead of delaying the main thread with these expensive calculations, offload them to a separate thread using a ThreadPoolExecutor
. This will collect the calculations to be done into a queue, and execute them as resources are available.
# use a ThreadPoolExecutor to multi-thread inference calculations
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
# function that the table listener will call as new reviews roll in
def on_update(update: TableUpdate, is_replay: bool) -> None:
input_col = "text"
feature_cols = ["rating", "parent_asin", "user_id", "timestamp"]
# get table enries that were added or modified
adds = update.added(cols=[input_col, *feature_cols])
modifies = update.modified(cols=[input_col, *feature_cols])
# collect data from this cycle into objects to feed to inference and output
if adds and modifies:
inputs = np.hstack([adds[input_col], modifies[input_col]])
features = {feature_col: np.hstack([adds[feature_col], modifies[feature_col]]) for feature_col in feature_cols}
elif adds:
inputs = adds[input_col]
features = {feature_col: adds[feature_col] for feature_col in feature_cols}
elif modifies:
inputs = modifies[input_col]
features = {feature_col: modifies[feature_col] for feature_col in feature_cols}
else:
return
# submit inference work to ThreadPoolExecutor
executor.submit(compute_and_publish_inference, inputs, features)
Now, tie it all together. The listen
function below calls on_update
every time a new review ticks into reviews_ticking
. This runs the inference calculation on the new data and stores the result in preds_blink
. Finally, blink_to_append_only
converts preds_blink
to an append-only table that stores the full history of the reviews and predictions.
# listen to ticking source and publish inference
handle = listen(reviews_ticking, on_update)
# convert preds_blink to a full-history table
preds = blink_to_append_only(preds_blink)
The AI model output is captured in preds
in real time as data rolls into reviews_ticking
.
In a follow-up post, we'll perform a downstream analysis on these results and see what we can learn about the incidence of ChatGPT-generated reviews on Amazon. Stay tuned!
Conclusion
This is a basic example of the systems discussed earlier, but it demonstrates how accessible such tasks have become with the right tools. HuggingFace and PyTorch have simplified deep learning by providing user-friendly interfaces to cutting-edge models. Researchers who spent countless hours implementing CNNs in C may be amazed at how straightforward these complex tasks are now.
A similar transformation is underway in real-time data processing with tools like Deephaven. Building real-time data applications has never been easier. Data practitioners will likely view batch processing, disparate interfaces, and manual configuration as outdated practices, much like we now regard C-based neural networks. The sentiment will be the same: "I'm glad I don't have to do that anymore."