Skip to main content

Listen to and configure alerts for RSS table updates

· 5 min read
DALL·E prompt: two robots sitting across from each other recording a podcast, cgsociety artstation
Jake Mulford
Customizable code to trigger notifications for new RSS table entries

Deephaven performs exceptionally well with real-time data. Throughout this blog series, we've demonstrated how to ingest RSS data feeds as an example of frequently changing data and how Deephaven empowers you to isolate what you care about from that sea of information. One option is the ability to create a listener on a table. A listener is simply a function that executes anytime a table is updated. In this blog, we will show how to add listeners to extend the RSS podcast exploration application to send notifications when certain podcasts are found.

Filtered data

The code in this blog will extend the RSS podcast aggregator sample app. If you're unfamiliar with that code, consider reading our previous RSS podcast exploration blog post.

To start, let's run a few filter queries on our RSS podcast feeds data. This allows us to work with subsets of our real-time data.

from deephaven.DateTimeUtils import currentTime, convertPeriod

tech_keywords = podcast_feeds.where("RssEntryTitle.contains(`Software`) || RssEntryTitle.contains(`Technology`) || RssEntryTitle.contains(`Computer`) || RssEntryTitle.contains(`Science`)")

one_day = convertPeriod("1D")
recently_published = podcast_feeds.where("PublishDatetime >= currentTime() - one_day")

This results in two filtered tables, one containing entries that have tech keywords and the other containing entries that were published in the last 24 hours.

Listener

We can now add listeners to these filtered tables, allowing us to execute functions whenever the tables are updated. Let's start with simple listeners that create a print statement when a table is updated.

The listener functions take a single argument that represents the update object.

from deephaven.table_listener import listen

def tech_keywords_listener(update, is_replay):
print("New row in tech_keywords table")
print(f"FUNCTION LISTENER: update={update}")

def recently_published_listener(update, is_replay):
print("New row in recently_published table")
print(f"FUNCTION LISTENER: update={update}")

tech_keywords_handler = listen(tech_keywords, tech_keywords_listener)
recently_published_handler = listen(recently_published, recently_published_listener)

Let's extend this example by printing the title of the added entry. To do this, we will need to access the update object's iterator methods and the associated table.

def tech_keywords_listener_print_entry(update, is_replay):
print("New row in tech_keywords table")

added_iterator = update.added.iterator()

while added_iterator.hasNext():
index = added_iterator.nextLong()
added_title = tech_keywords.getColumnSource("RssEntryTitle").get(index)
print(f"New entry {added_title} found in tech_keywords table")

def recently_published_listener_print_entry(update, is_replay):
print("New row in recently_published table")

added_iterator = update.added.iterator()

while added_iterator.hasNext():
index = added_iterator.nextLong()
added_title = recently_published.getColumnSource("RssEntryTitle").get(index)
print(f"New entry {added_title} found in recently_published table")

tech_keywords_print_entry_handler = listen(tech_keywords, tech_keywords_listener_print_entry)
recently_published_print_entry_handler = listen(recently_published, recently_published_listener_print_entry)

And now we have a listener that logs all the new entries in our tables. Listeners can execute any Python function, so you can use them to perform tasks such as Slack notifications, send emails, send text messages, and create HTTP requests.

Parameterizing the listener

You may notice in the above example that we have two functions that are almost identical. This isn't a problem with a small number of listeners and tables, but if we were to write several more similar listeners, we'd have a lot of duplicated code. How can we work around this to parameterize our listener?

Using the above example, ideally we'd want to parameterize the table and the print statement to end up with a listener function that takes those arguments along with the update object. Unfortunately, we can't change the method signature of the listener functions. However, we can create a function that takes our first two parameters (the table and print statement) and returns a function that takes the update object. If you're familiar with functional programming, this is very similar to currying.

Let's rewrite our print_entry listener function accordingly.

def print_entry_builder(table, table_name):
def print_entry(update, is_replay):
print(f"New row in {table_name} table")

added_iterator = update.added.iterator()

while added_iterator.hasNext():
index = added_iterator.nextLong()
added_title = table.getColumnSource("RssEntryTitle").get(index)
print(f"New entry {added_title} found in {table_name} table")
return print_entry

tech_keywords_listener_print_entry = print_entry_builder(tech_keywords, "tech_keywords")
recently_published_listener_print_entry = print_entry_builder(recently_published, "recently_published")

tech_keywords_print_entry_handler = listen(tech_keywords, tech_keywords_listener_print_entry)
recently_published_print_entry_handler = listen(recently_published, recently_published_listener_print_entry)

The print_entry_builder function takes our two parameters (table and table_name) and defines the print_entry function that takes the update object. Because of Python's variable scoping, print_entry will now reference the parameter table and table_name objects. This allows print_entry_builder to return different print_entry functions based on the table and table_name parameters.

We've now successfully reused our listener function across multiple tables.

Slack notifications

As stated above, listeners can pretty much be any function. The RSS podcast aggregator sample app shows an example of how you could create a Slack listener.

from slack import WebClient

import os

TOKEN = os.environ["SLACK_API_TOKEN"]
CHANNEL = os.environ["SLACK_CHANNEL_ID"]

client = WebClient(token=TOKEN)

def slack_notification_builder(table, slack_channel, text):
def slack_notification(update, is_replay):
added_iterator = update.added.iterator()
while added_iterator.hasNext():
idx = added_iterator.nextLong()
podcast_title = table.getColumnSource("RssEntryTitle").get(idx)
client.chat_postMessage(channel=slack_channel, text=text.format(podcast_title=podcast_title))
return slack_notification

slack_notification_recently_published = slack_notification_builder(recently_published, CHANNEL, "Podcast {podcast_title} found in recently_published table")
recently_published_handler = listen(recently_published, slack_notification_recently_published)

slack_notification_tech_keywords = slack_notification_builder(tech_keywords, CHANNEL, "Podcast {podcast_title} found in tech_keywords table")
tech_keywords_handler = listen(tech_keywords, slack_notification_tech_keywords)

Simply plug in your SLACK_API_TOKEN and SLACK_CHANNEL_ID environmental variables to use this listener. You'll get a Slack message anytime your chosen keywords show up in the RSS table.

Reach out

If you use this project and blog post as a baseline for working with Deephaven, we'd love to hear about it. Let us know what you come up with in our Github Discussions, or our Slack community.