Skip to main content

Wrangling semi-structured real-time data

· 7 min read
DALL·E prompt: Wrangling a rope lasso around data server rack, highly detailed cgsociety 4k
Chip Kent

JSON is one of the most universal data formats for requesting data via an API. Across languages and applications, if you're working with semi-structured data, you're probably working with JSON. However, as you likely know, working with JSON can be less than fun, especially if the data is nested or complex. In this tutorial, I walk you through creating a real-time database for processing and querying JSON. Starting simple, it builds up to an application that monitors public events on GitHub -- in only 32 lines of code!

During a recent project, I needed to persist a real-time stream of API requests. All of these requests needed to be in the same table, and I had to be able to write complex queries against the requests. Such complex requirements often result in software nightmares.

To support this mixed-schema system, I used Deephaven, a real-time query engine that supports many data types, sophisticated query patterns, and embedded Python functions. JSON is well supported -- including complex and nested JSON.

To create your own real-time JSON database, taking advantage of Deepahven's capabilities, follow the steps below.

Create a real-time table

As a first step, let's create a real-time table to write data to. This is done using a DynamicTableWriter.

import json
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
from deephaven.time import now

# Create the table writer
table_writer = DynamicTableWriter({"Timestamp": dht.DateTime, "JsonString": dht.string})

# Get the real-time table
t = table_writer.table

# Function to log the current timestamp and a JSON encoded dictionary to the table
def log_json(data):
# Encode the dictionary as JSON
json_string = json.dumps(data)

# Log the data to the table writer
table_writer.write_row(now(), json_string)

# Log some data
log_json({"A": 1, "B": 2})
log_json({"X": "xval", "Y": {"C": 4}})

Log data

Now let's take real-time data logging to the next level. In real-time applications, data is continuously flowing. Data is pushed into applications by streaming APIs, or pulled in via some sort of query. To illustrate data being pulled into an application, we create a thread, running in the background, that logs a new row every second. By having a logging thread, the main thread can be used for future interactive queries.

import json
import time
from random import random
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
from deephaven.time import now
import threading

# Create the table writer
table_writer = DynamicTableWriter({"Timestamp": dht.DateTime, "JsonString": dht.string})

# Get the real-time table
t = table_writer.table

# Function to log the current timestamp and a JSON encoded dictionary to the table
def log_json(data):
# Encode the dictionary as JSON
json_string = json.dumps(data)

# Log the data to the table writer
table_writer.write_row(now(), json_string)

# Function to log a row of data every second, for 30 seconds
def logging_thread_func():
for i in range(30):
log_json({"A": i, "B": random()})
time.sleep(1)

# Create and start the logging thread
thread = threading.Thread(target=logging_thread_func)
thread.start()

img

Now, we have a real-time streaming JSON source. But, how do we query it?

Query the table

Queries are the easy part. Deephaven is designed to support very complex logic, and queries can use formulas composed of operators, functions, objects, columns, variables, and special variables. For our real-time JSON database, we will use plain old Python functions and modules to query the JSON strings. To understand everything possible with Deephaven query formulas, see How to use formulas.

Here the get_value function extracts a value from a JSON strings. get_value is implemented using the core Python json module, but you could use any module you like.

import json
import time
from random import random
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
from deephaven.time import now
import threading

# Create the table writer
table_writer = DynamicTableWriter({"Timestamp": dht.DateTime, "JsonString": dht.string})

# Get the real-time table
t = table_writer.table

# Function to log the current timestamp and a JSON encoded dictionary to the table
def log_json(data):
# Encode the dictionary as JSON
json_string = json.dumps(data)

# Log the data to the table writer
table_writer.write_row(now(), json_string)

# Function to log a row of data every second, for 30 seconds
def logging_thread_func():
for i in range(30):
log_json({"A": i, "B": random()})
time.sleep(1)

# Create and start the logging thread
thread = threading.Thread(target=logging_thread_func)
thread.start()

# Function to extract the value associated with "key" from a json string or return None if the key is not present
def get_value(json_string, key):
json_dict = json.loads(json_string)

if key in json_dict:
return json_dict[key]

return None

# Create a new table containing the values "A" and "B" from the JSON
t2 = t.update(["A = (int) get_value(JsonString, `A`)", "B = (double) get_value(JsonString, `B`)"])

Put this to work: GitHub Events

We've created a toy, real-time JSON database that supports both storage and queries. Pretty cool, right?

Now, let's amp things up and create a real-life example.

For this, we will look at all of the events that GitHub makes public. To see the JSON for these events, navigate to https://api.github.com/events. Since the GitHub API does not support streaming, we can download the latest events every 10 seconds.

In real life, when we work with JSON data, it's rarely as simple as the first two examples. As you know, real JSON data is often nested and messy. However, Python's json module makes working with JSON easy. json decodes JSON strings as dictionaries. Then values can be retrieved using JSON keys to look up desired values. get_event_repo illustrates accessing repository URLs by using the repo and url keys.

import json
import threading
import time
from urllib.request import urlopen
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht
from deephaven.time import now

# URL for GitHub event data
url = "https://api.github.com/events"

# Create the table writer
table_writer = DynamicTableWriter({"Timestamp": dht.DateTime, "EventJson": dht.string})

# Get the real-time table
t = table_writer.table

# Function to log the current timestamp and a JSON encoded dictionary to the table
def log_json(timestamp, data):
json_string = json.dumps(data)
table_writer.write_row(timestamp, json_string)

# Function to pull all events every 10 seconds and log one row for each event
def logging_thread_func():
while True:
timestamp = now()
event_data = json.loads(urlopen(url).read())

for event in event_data:
log_json(timestamp, event)

time.sleep(10)

# Create and start the logging thread
thread = threading.Thread(target=logging_thread_func)
thread.start()

# Function to get the ID for an event from JSON
def get_event_id(json_string):
return int(json.loads(json_string)["id"])

# Function to get the user performing an event from JSON
def get_event_login(json_string):
return json.loads(json_string)["actor"]["login"]

# Function to get the type of event from JSON
def get_event_type(json_string):
return json.loads(json_string)["type"]

# Function to get the repository the event was performed on from JSON
def get_event_repo(json_string):
return json.loads(json_string)["repo"]["url"]

# Create a new table that summarizes the events
event_summary = t.select(["Timestamp", "Id = (long) get_event_id(EventJson)", "Actor = (String) get_event_login(EventJson)", "EventType = (String) get_event_type(EventJson)", "Repo = (String) get_event_repo(EventJson)"])

# Logging can result in events being duplicated if they are present in multiple URL downloads
# Deduplicate the events by only taking the first instance
event_summary_deduplicated = event_summary.first_by("Id")

Now we have a useful application. Using only 32 lines of code, we have created a real-time JSON database to monitor GitHub events.

Since the interesting JSON event data is now available in a Deephaven table, you can use Deephaven queries to analyze the events. For example, you can get all GitHub issue events by running the following command:

issues = event_summary_deduplicated.where("EventType = `IssuesEvent`")

img

What kind of JSON are you working with? Tell us on Slack.