Skip to main content

Crossing the streams is good

· 3 min read
Pete Goddard

Remember when Egon Spengler had a radical idea? We’ve got to cross the streams, he said. It was the only way to win. Well, in our world we’re wrangling data, not ghosts, but the same concept holds true. When tackling complex data problems especially, crossing the streams is not only good, but necessary.

img

Individual streams

Individual streams of data are valuable in their own right, like when you’re handling a smaller problem or ghost. They provide data related to a particular dimension—the price of a stock, the order of a customer, the metric of a device. Analytics and applications can be served by a single stream of data, but the uses are narrow and local.

'Crossing the streams' is good

Stream crossing unveils a much grander arena of possibility, one filled with history, context, and related signals. When the heroes (you know, Spengler, Venkman and the crew) needed to defeat their biggest opponent, they joined forces—and streams—to conquer the challenge. The whole was greater than the sum of the parts. Something exponential happened.

In our community, data scientists, analysts, and developers are doing equally heroic things conquering complex data problems. When you harness all the data streams available—both batch and real-time streams—you’re empowered to address sophisticated problems. And as with Spengler and Venkman, sometimes you need a few people to bring their gear and help out.

The newest data is oftentimes the most important data, but only once a framework has been established by the historical data, and only with the awareness provided by other contemporaneous events. In other words, crossing data streams is a powerful thing.

Example code

The following code example demonstrates how Deephaven takes three different data sources and combines them...

# import a kafka stream
from deephaven import ConsumeKafka as ck

def get_trades_stream():
return ck.consumeToTable(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' },
'io.deephaven.crypto.kafka.TradesTopic',
key = ck.IGNORE,
value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=ck.ALL_PARTITIONS_SEEK_TO_END,
table_type='append')

crypto_stream = get_trades_stream().view("Timestamp", "Exchange", "Instrument", "Price", "Size")

# import a CSV
from deephaven.TableTools import readCsv

crypto_0922 = readCsv("https://media.githubusercontent.com/media/deephaven/examples/main/CryptoCurrencyHistory/CryptoTrades_20210922.csv")\
.view("Timestamp", "Exchange", "Instrument", "Price", "Size")

# read Parquet where it is
from deephaven import ParquetTools as pt

crypto_0923 = pt.readTable("/data/large/crypto/CryptoTrades_20210923.snappy.parquet")\
.view("Timestamp", "Exchange", "Instrument", "Price", "Size")

# Combine streams, here Parquet and Kafka
from deephaven.TableTools import merge

merge_3_and_agg = merge(crypto_stream, crypto_0922)\
.view("Instrument", "Exchange", "Trade_Count = 1", "Base_Tot_Value = Price * Size")\
.sumBy("Instrument", "Exchange")

from deephaven.DBTimeUtils import formatDate
# Perform operations on combined streams, here CSV with Parquet and Kafka
join_3_tables = crypto_stream.updateView("Date = formatDate(Timestamp, TZ_NY)").countBy("Trade_Count", "Date", "Exchange", "Instrument")\
.naturalJoin(crypto_0922.countBy("Trade_Count", "Exchange", "Instrument"), "Exchange, Instrument", "Trade_Count_0922 = Trade_Count")
.naturalJoin(crypto_0923.countBy("Trade_Count", "Exchange, Instrument"), "Exchange, Instrument", "Trade_Count_0923 = Trade_Count")

Get developing with Deephaven Core