Skip to main content

Take Twitter's temperature with Deephaven: a sentiment analysis tutorial

· 8 min read
DALL·E prompt: an angry bluebird with a glass thermometer inside it's beak, in a yellow room, cgsociety
Amanda Martin
Twitter is a treasure chest of data for social sentiment

The money made in meme stock trading is a stunning example of the power of community sentiment. Nevertheless, you could spend hours manually scrolling social media without gleaning meaningful insight. Or, you could equip yourself with automation and ML to measure sentiment - in real-time - and produce useful results.

Python and TensorFlow's natural language processing libraries and Deephaven's stream-and-batch table engine together meet this challenge. Below, I share the details of an app I made to do natural language processing of the Twitter feed and marry it to stock price information - all in real-time.

This starter program looks at tweets about cryptocurrency; however, the possibilities are endless and we encourage you to tailor these queries to suit your interests.

img

The program showcased here integrates Deephaven with Twitter and Python's Natural Language Toolkit (NLTK) to pull recent tweets and evaluate sentiment in real-time. We start by pulling the data and running a SentimentIntensityAnalyzer on each tweet. We then aggregate the posts to see the overall positivity or negativity of that term on Twitter with time.

Prereqs

  1. To get started, go to our example repo or run the following commands to download all the code and dependencies to work with Docker.
git clone https://github.com/deephaven-examples/twitter-sentiment.git
cd twitter-sentiment

A start script will install the needed Python modules and launch the Deephaven IDE.

To run it, execute:

./twitter-sentiment.sh
  1. Twitter provides an API to make it easy to pull public tweets. In order to use this code as-is, you need to also have a Twitter Developer account and copy your Bearer Token.

  2. Import the appropriate packages to use each platform. I perform a pip install inside Deephaven. This ensures my program runs without any extra modifications needed. For other ways to install Python packages, see our How to install Python packages guide.

import nltk
nltk.download('punkt')
nltk.download('vader_lexicon')

from requests_oauthlib import OAuth1Session
import requests
from datetime import datetime

import time
import re
import json
from nltk.sentiment.vader import SentimentIntensityAnalyzer

from deephaven.DateTimeUtils import convertDateTime, minus, convertPeriod, currentTime
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
import threading

Run the program

This program is intended to be fine-tuned to fit your data needs. Below are the values you'll need to change to customize the program for your specific use case and information.

In this example, I perform sentiment analysis on Dogecoin tweets over the course of one week.

  1. First, you need the token I mentioned above. Important: the Bearer Token is provided by Twitter and each developer has a monthly limit, so keep this token private.

  2. I search for any tweet that contains the term DOGE.

  3. Since there is a tweet-rate-limit and I want to see the tweets for the last seven days, I collect just 10 tweets for each time pull with max_results = 10. I recommend using low numbers for testing. When you are ready for production, increase as needed, while keeping in mind the rate limit.

  4. Next, to see how the sentiment of tweets change with time, I divide those seven days up into discreet time_bins. More bins will give you the ability to see more refined changes in the social sentiment, but would also pull in more tweets, which means you hit your rate limit quicker.

  5. My Twitter access level limits the amount of historical tweets I can pull to seven days, so I set time_history = 7. This is the standard for non-academic searches.

# Make sure you enter your token like this 'AAAD...JFH'
bearer_token = '<INPUT YOUR TOKEN HERE>'

# Change this to search whatever term you want on Twitter
search_term = 'DOGE'

# Max results per time bin
max_results = 10

# Time intervals to split data
time_bins = 10

# How many days to go back. Max 7 for non-academic searches
time_history = 7

Configure your functions

In this section of code, we created the functions needed to pull the data from Twitter.

  1. Twitter provides a lot of sample code with the v2 API. These functions are pulled from the Github Twitter-API-v2-sample-code repo so that we connect to the needed endpoints with the appropriate authorization.
def create_headers(bearer_token):
headers = {
"Authorization": "Bearer {}".format(bearer_token),
"User-Agent": "v2FullArchiveSearchPython"}
return headers

search_url = "https://api.twitter.com/2/tweets/search/recent"

def connect_to_endpoint(url, headers, params):
response = requests.request("GET", search_url, headers=headers, params=params)
if response.status_code != 200:
raise Exception(response.status_code, response.text)
return response.json()
  1. Tweets contain a lot of metadata that can be useful. Here, I set the fields I like to work with: just the tweet.fields and user.fields data to keep it simple. Using these fields allows me to weigh tweets based on the popularity of the tweet or user and ignores location information. The rest are left for you to add as needed and might be useful if you want to limit the search to certain places in the world.
def get_query_params(start_time, end_time):
return {'query': search_term,
'start_time': start_time,
'end_time': end_time,
'max_results': max_results,
# 'expansions': 'author_id,in_reply_to_user_id,geo.place_id',
'tweet.fields': 'id,text,author_id,in_reply_to_user_id,geo,conversation_id,created_at,lang,public_metrics,referenced_tweets,reply_settings,source',
'user.fields': 'id,name,username,created_at,description,public_metrics,verified',
# 'place.fields': 'full_name,id,country,country_code,geo,name,place_type',
'next_token': {}}
  1. Now we have the function that pulls the tweets. I've separated it from the previous code to make it easier to change the query_params to the date zone you want.

  2. By default, if given a start time range of seven days ago, only recent tweets will be pulled. Since I want a guarantee of dates in bins, I supply the exact start and end date for each request.

  3. This function is called for each time bin and returns all the tweet data requested in JSON format.

def get_tweets(query_params):
headers = create_headers(bearer_token)
json_response = connect_to_endpoint(search_url, headers, query_params)
return(json_response['data'])

Clean the tweets

Since I'm performing a sentiment analysis on the content of the tweets, I clean each tweet. This is optional but provides a more uniform appearance to the tweets in the table.

def cleanText(text):
#to lowercase
text = text.lower()
#correct spaces (e.g. "End sentence.Begin another" becomes "End sentence. Begin another")
text = re.sub(r'\.([a-zA-Z])', r'. \1', text)
text = re.sub(r'\?([a-zA-Z])', r'. \1', text)
text = re.sub(r'\!([a-zA-Z])', r'. \1', text)
#replace q1,2,3,4 with q
text = re.sub("q[1-4]", "q", text)
#replace 20xx with 2000
text = re.sub("20[0-2][0-9]", "2000", text)
return text

Assess sentiment

Next, I run each tweet through the NLTK SentimentIntensityAnalyzer. This returns the polarity score of the tweet - that is how positive, negative, and neutral a tweet is, as well as the combined score. Often a tweet will be filled with made up words, acronyms and such. These generally are scored as neutral and do not impact the analysis.

def analyze_line(text):
sid = SentimentIntensityAnalyzer()
return(sid.polarity_scores(text))

Put our data in a table

This last function is needed to create a table to store our data.

  1. We use Deephaven's DynamicTableWriter class, which calls the function for each iteration of the dynamic table.

  2. We add to the table for each time_bins.

By formatting the data with Deephaven Types, we make it easy to join, filter, summarize, plot and perform other analysis on our table.

def thread_func():
for i in range(1, time_bins):
start_time = str(minus(currentTime(),convertPeriod("T"+str(int(i*(24*time_history)/time_bins))+"H")))[:-9]+'Z'
end_time = str(minus(currentTime(),convertPeriod("T"+str(int((i-1)*(24*time_history)/time_bins))+"H")))[:-9]+'Z'
query_params = get_query_params(start_time, end_time)
all_text = get_tweets(query_params)
for t in all_text:
id = float(t['id'])
combined = analyze_line(cleanText(t['text']))
negative = combined.get('neg')
neutral = combined.get('neu')
compound = combined.get('compound')
positive = combined.get('pos')
dateTime = t['created_at'][:-1]+" NY"
retweet_count = t['public_metrics']['retweet_count']
reply_count = t['public_metrics']['reply_count']
like_count = t['public_metrics']['like_count']
quote_count= t['public_metrics']['quote_count']
tableWriter_sia.logRow(t['text'], float(compound), float(negative), float(neutral), float(positive), float(id),convertDateTime(dateTime), int(retweet_count), int(reply_count), int(like_count), int(quote_count))
  1. Finally, I create the tableWriter_sia and execute the threading to run the above function. This will create a table sia_data that fills with the tweets and their metadata, as well as the sentiment of each tweet.
tableWriter_sia = DynamicTableWriter(
["Text", "Compound", "Negative", "Neutral", "Positive", "ID", "DateTime", "Retweet_count", "Reply_count", "Like_count", "Quote_count"],
[dht.string, dht.double, dht.double, dht.double, dht.double, dht.double, dht.datetime, dht.int_, dht.int_, dht.int_, dht.int_])
sia_data = tableWriter_sia.getTable()

thread_sia = threading.Thread(target = thread_func)
thread_sia.start()

img

Analyze the data

Now the fun part. Let's do some analysis on the tweets so we can see how the search term's positivity and negativity have changed with time.

First, let's aggregate the tweets so that we can get a summary of each tweet inside our chosen time bins.

This code:

  1. Creates a series of averages and weighted averages.
  2. Creates the combined_tweets table that shows us the overall sentiment each minute for our time bins.
from deephaven import agg as agg

agg_list = [
agg.count_(col = "Count"),
agg.avg(cols = ["Average_negative = Negative"]),
agg.avg(cols = ["Average_neutral = Neutral"]),
agg.avg(cols = ["Average_positive = Positive"]),
agg.avg(cols = ["Average_compound = Compound"]),
agg.weighted_avg(wcol="Retweet_count", cols = ["Weight_negative = Negative"]),
agg.weighted_avg(wcol="Retweet_count", cols = ["Weight_neutral = Neutral"]),
agg.weighted_avg(wcol="Retweet_count", cols = ["Weight_positive = Positive"]),
agg.weighted_avg(wcol="Retweet_count", cols = ["Weight_compound = Compound"])
]

from deephaven.DateTimeUtils import expressionToNanos, convertDateTime, upperBin

combined_tweets = sia_data.update(formulas = ["Time_bin = upperBin(DateTime, 10000)"]).sort(order_by = ["DateTime"]).agg_by(agg_list, by = [by = ["Time_bin"]).sort("Time_bin"])

img

The table's cool, but not as useful as a plot. I use Deephaven's plotting methods to create a nice visualization of my data.

from deephaven import Plot

sia_averages = Plot.plot("AVG_Neg", combined_tweets, "Time_bin", "Average_negative")\
.lineColor(Plot.colorRGB(255,0,0,100))\
.plot("AVG_Pos", combined_tweets, "Time_bin", "Average_positive")\
.lineColor(Plot.colorRGB(0,255,0,100))\
.show()

img

Your turn

This code provides a basic starter. You can use it to make your own searches, tie to other programs, or just see how social media is doing.

We hope this program inspires you. If you make something of your own or have an idea to share, we'd love to hear about it on Gitter!