Real-time data is the lifeblood of our modern world. Many aspects of our lives are defined by the phrase "right here, right now". It’s more important than ever that you can quickly analyze and understand what data is telling you, so you can make informed decisions. Plots are crucial in this process.
Deephaven is known for its live, streaming tables. Live updating plots are a natural pairing. With the Matplotlib plugin, you can easily build data-streaming charts with a tool you already know and love.
In this blog post, we'll use the Twitter Streaming API to collect tweets in real time and build a bunch of interesting real-time charts. Let’s dive in!
Free access to the Twitter API is no longer available. However, the code and concepts in this blog are still valid for users with a paid developer account.
Collect Twitter streaming data
First of all, we need to create a Twitter Developer account to obtain an access token. In our example, we use the Twitter API to get tweets containing the word "news".
Click to see the code!
BEARER_TOKEN = <INSERT YOUR TOKEN>
TWITTER_ENDPOINT_URL = "https://api.twitter.com/2/tweets/search/stream"
def bearer_oauth(r):
"""
Method required by bearer token authentication.
"""
r.headers["Authorization"] = f"Bearer {BEARER_TOKEN}"
r.headers["User-Agent"] = "v2FilteredStreamPython"
return r
def set_rules():
"""
Method to add rules to the stream
"""
demo_rules = [{"value": "news", "tag": "news"}]
payload = {"add": demo_rules}
response = requests.post(f"{TWITTER_ENDPOINT_URL}/rules", auth=bearer_oauth, json=payload)
if response.status_code != 201:
raise Exception(
"Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
)
def get_tweets():
"""
Method to get tweets
"""
response = requests.get(f"{TWITTER_ENDPOINT_URL}?tweet.fields=lang", auth=bearer_oauth, stream=True)
if response.status_code != 200:
raise Exception(
"Cannot get stream (HTTP {}): {}".format(
response.status_code, response.text
)
)
return response
# Add our demo rule containing the word "news" to the stream
set_rules()
We perform some simple processing on the coming tweets and put the raw and processed data into a Deephaven table, tweet_table
:
Click to see the code!
twitter_table_col_definitions = {"tweet": dht.string, "clean_tweet": dht.string}
twitter_table_writer = DynamicTableWriter(twitter_table_col_definitions)
tweet_table = twitter_table_writer.table
corpus = set(nltk.corpus.words.words())
def preprocess(tweet):
"""
Method to preprocess tweets (remove user names, non-alphabetic characters, etc.)
"""
txt = ' '.join(word for word in tweet.split() if not word.startswith('@'))
txt = re.sub('[^a-zA-Z]', ' ', txt)
txt = txt.lower().split()
txt = [word for word in txt if not word in stopwords.words('english') and word in corpus and len(word) > 3]
txt = ' '.join(txt)
return txt
def write_live_data():
"""
The function to write twitter data to a table
"""
response = get_tweets()
diag_points = []
for response_line in response.iter_lines():
if response_line:
json_response = json.loads(response_line)
lang = json_response["data"]["lang"]
# we process only english tweets
if lang == "en":
tweet = json_response["data"]["text"]
clean_tweet = preprocess(tweet)
twitter_table_writer.write_row([tweet, clean_tweet])
# Run the thread that writes tweets to the table
thread = threading.Thread(target=write_live_data)
thread.start()
Also, we create another table to keep track of the word frequency of our tweets:
count_table_col_definitions = {"word": dht.string, "count": dht.int32}
count_table_writer = DynamicTableWriter(count_table_col_definitions)
count_table = count_table_writer.table
We can use the existing write_live_data
function to write data into count_table
:
words = clean_tweet.split()
for word in words:
count_table_writer.write_row([word, 1])
Real-time bar chart
Now let’s visualize the top 10 high-frequency words:
TOP_N = 10
# Find TOP N most popular words in tweets
count = count_table.count_by("Number", by=["word"])
count_sorted = count.sort(order_by=["Number"], order=[SortDirection.DESCENDING])
count_sorted_top = count_sorted.head(TOP_N)
# Draw real-time bar chart with the frequency of the top N words
bar_fig, bar_fig_ax = plt.subplots()
plt.xticks(rotation=20)
rects = bar_fig_ax.bar(range(TOP_N), [0] * TOP_N)
bar_fig_ax.set_xticks(range(TOP_N))
def animate_bar_plot_fig(data, update):
for rect, h in zip(rects, data["Number"]):
rect.set_height(h)
bar_fig_ax.set_xticklabels(data["word"])
bar_fig_ax.relim()
bar_fig_ax.autoscale_view(True, True, True)
bar_plot_ani = TableAnimation(bar_fig, count_sorted_top, animate_bar_plot_fig)
Real-time word cloud
This simple histogram can be very useful to get a first glance at the data. But what if we make this visualization less boring - word clouds a better way to visualize this data, and they're just as informative:
# Draw real-time word cloud for top N words
wordcloud_fig = plt.figure()
wordcloud_ax = wordcloud_fig.subplots()
def animate_wordcloud_fig(data, update):
data_frame = dhpd.to_pandas(count_sorted_top)
word_str = " ".join(data_frame["word"].tolist())
wordcloud = WordCloud(width=800, height=800,
background_color='white',
min_font_size=10, max_words=50).generate(word_str)
wordcloud_ax.imshow(wordcloud)
wordclod_ani = TableAnimation(wordcloud_fig, count_sorted_top, animate_wordcloud_fig)
Real-time box plot
Now let’s analyze our tweets by counting the number of words. To do this, we need to update our write_live_data
function to have the length
variable in our tweet_table
table:
words = clean_tweet.split()
length = len(words)
twitter_table_writer.write_row([tweet, clean_tweet, length])
The code for building an animated box plot is pretty simple:
# Draw real-time box plot to show the distribution of the number of words in tweets
box_plot_fig = plt.figure()
box_plot_ax = box_plot_fig.subplots()
box_plot = box_plot_ax.boxplot([], patch_artist=True, labels=['Distribution of Tweet Word Counts'])
def animate_box_plot_fig(data, update):
# clear the subplot
box_plot_ax.cla()
box_plot_ax.boxplot(x=data['length'])
box_plot_ax.relim()
box_plot_ax.autoscale_view(True, True, True)
box_plot_ani = TableAnimation(box_plot_fig, tweet_table, animate_box_plot_fig)
Real-time Venn diagram
To demonstrate another type of visualization, let’s build a Venn diagram to explore the relationship between specific words - for example, “good” and “news”. This is a simple example that could be easily expanded to make truly informative and interesting connections between words used in tweets.
We adopted the code from this repository and modified it for real-time data.
Click to see the code!
twitter_table_col_definitions = {"tweet": dht.string, "clean_tweet": dht.string, "length": dht.int32, "category": dht.string, "point_x": dht.double, "point_y": dht.double}
twitter_table_writer = DynamicTableWriter(twitter_table_col_definitions)
tweet_table = twitter_table_writer.table
TERM_1 = 'good'
TERM_2 = 'news'
RADIUS_1 = 10
SHIFT_1 = -6
CENTER_1 = (SHIFT_1, 0)
X_1 = [(-RADIUS_1 + SHIFT_1) + i * 0.001 for i in range(2 * 1000 * RADIUS_1 + 1)]
Y1_UPPER = [np.sqrt(RADIUS_1 ** 2 - (i - SHIFT_1) ** 2) for i in X_1]
Y1_LOWER = [-y for y in Y1_UPPER]
RADIUS_2 = 10
SHIFT_2 = 6
CENTER_2 = (SHIFT_2, 0)
X_2 = [(-RADIUS_2 + SHIFT_2) + i * 0.001 for i in range(2 * 1000 * RADIUS_2 + 1)]
Y2_UPPER = [np.sqrt(RADIUS_2 ** 2 - (i - SHIFT_2) ** 2) for i in X_2]
Y2_LOWER = [-y for y in Y2_UPPER]
def distance(point1, point2):
return np.sqrt((point1[0] - point2[0]) ** 2 + (point1[1] - point2[1]) ** 2)
def in_circle(point, center, radius):
if distance(point, center) < radius - 0.7:
return True
return False
def out_circle(point, center, radius):
if distance(point, center) > radius + 0.7:
return True
return False
def collide(rand, points):
r = 0.4
for i in points:
if distance([rand[0], rand[1]], [i[0], i[1]]) < 2 * r:
return True
return False
def write_live_data():
"""
The function to write twitter data to a table
"""
response = get_tweets()
diag_points = []
for response_line in response.iter_lines():
if response_line:
json_response = json.loads(response_line)
lang = json_response["data"]["lang"]
# we are interested only in english tweets
if lang == "en":
tweet = json_response["data"]["text"]
clean_tweet = preprocess(tweet)
words = clean_tweet.split()
for word in words:
word_table_writer.write_row([word, 1])
length = len(words)
# check if our special terms are used within the messages
count = 0
category = ''
point_x = None
point_y = None
if TERM_1 in words:
category = 'left'
count += 1
if TERM_2 in words:
category = 'right'
count += 1
if count == 2:
category = 'middle'
if category == 'middle':
rand = [random.uniform(CENTER_1[0], CENTER_2[0]),
random.uniform(CENTER_1[1] - RADIUS_1, CENTER_2[1] + RADIUS_2)]
while (not (in_circle(rand, CENTER_1, RADIUS_1) and in_circle(rand, CENTER_2, RADIUS_2))) or collide(rand, diag_points):
rand = [random.uniform(CENTER_1[0], CENTER_2[0]), random.uniform(CENTER_1[1] - RADIUS_1, CENTER_2[1] + RADIUS_2)]
point_x = rand[0]
point_y = rand[1]
diag_points.append(rand)
if category == 'left':
rand = [random.uniform(CENTER_1[0] - RADIUS_1, CENTER_1[0] + RADIUS_1),
random.uniform(CENTER_1[1] - RADIUS_1, CENTER_1[1] + RADIUS_1)]
while (not (in_circle(rand, CENTER_1, RADIUS_1) and out_circle(rand, CENTER_2, RADIUS_2))) or collide(rand, diag_points):
rand = [random.uniform(CENTER_1[0] - RADIUS_1, CENTER_1[0] + RADIUS_1),
random.uniform(CENTER_1[1] - RADIUS_1, CENTER_1[1] + RADIUS_1)]
diag_points.append(rand)
point_x = rand[0]
point_y = rand[1]
if category == 'right':
rand = [random.uniform(CENTER_2[0] - RADIUS_2, CENTER_2[0] + RADIUS_2),
random.uniform(CENTER_2[1] - RADIUS_2, CENTER_2[1] + RADIUS_2)]
while (not (in_circle(rand, CENTER_2, RADIUS_2) and out_circle(rand, CENTER_1, RADIUS_1))) or collide(rand, diag_points):
rand = [random.uniform(CENTER_2[0] - RADIUS_2, CENTER_2[0] + RADIUS_2),
random.uniform(CENTER_2[1] - RADIUS_2, CENTER_2[1] + RADIUS_2)]
diag_points.append(rand)
point_x = rand[0]
point_y = rand[1]
twitter_table_writer.write_row([tweet, clean_tweet, length, category, point_x, point_y])
# Run the thread that writes to the table
thread = threading.Thread(target=write_live_data, args=[set])
thread.start()
venn_plot_fig = plt.figure()
venn_ax = venn_plot_fig.subplots()
venn_ax.plot(X_1, Y1_UPPER, color='k')
venn_ax.plot(X_1, Y1_LOWER, color='k')
venn_ax.plot(X_2, Y2_UPPER, color='k')
venn_ax.plot(X_2, Y2_LOWER, color='k')
venn_ax.text(SHIFT_1 - RADIUS_1 + 2, 10, TERM_1.upper(), ha='center', color='white')
venn_ax.text(SHIFT_2 + RADIUS_2 - 2, 10, TERM_2.upper(), ha='center', color='white')
plt.gca().axes.get_xaxis().set_visible(False)
plt.gca().axes.get_yaxis().set_visible(False)
plt.axis('equal')
venn_ax.set_title('Twitter Venn Diagram')
def animate_venn_fig(data, update):
venn_ax.plot(data["point_x"], data["point_y"], marker='o', ms=10, color='green', lw=0)
tweet_table_with_category = tweet_table.where(filters=["category in `middle`, `left`, `right`"])
venn_ani = TableAnimation(venn_plot_fig, tweet_table_with_category, animate_venn_fig)
An animated chart will be rendered as shown below. The circle on the left contains dots that represent tweets with the word 'good' but do not contain 'news'. The circle on the right has messages that contain 'news' but not 'good'. The intersecting area illustrates how many tweets contain both terms used together.
Talk to us
We’ve demonstrated a few examples of real-time streaming charts using Deephaven’s Matplotlib plugin. When it comes to how to display real-time data, the sky is the limit. But we hope this article can be a good starter! Share your ideas on Slack.
If you find any errors or have trouble executing our sample code, click here to download the complete source code of the project we have created for this blog post.