Remember when Egon Spengler had a radical idea? We’ve got to cross the streams, he said. It was the only way to win. Well, in our world we’re wrangling data, not ghosts, but the same concept holds true. When tackling complex data problems especially, crossing the streams is not only good, but necessary.
Individual streams
Individual streams of data are valuable in their own right, like when you’re handling a smaller problem or ghost. They provide data related to a particular dimension—the price of a stock, the order of a customer, the metric of a device. Analytics and applications can be served by a single stream of data, but the uses are narrow and local.
'Crossing the streams' is good
Stream crossing unveils a much grander arena of possibility, one filled with history, context, and related signals. When the heroes (you know, Spengler, Venkman and the crew) needed to defeat their biggest opponent, they joined forces—and streams—to conquer the challenge. The whole was greater than the sum of the parts. Something exponential happened.
In our community, data scientists, analysts, and developers are doing equally heroic things conquering complex data problems. When you harness all the data streams available—both batch and real-time streams—you’re empowered to address sophisticated problems. And as with Spengler and Venkman, sometimes you need a few people to bring their gear and help out.
The newest data is oftentimes the most important data, but only once a framework has been established by the historical data, and only with the awareness provided by other contemporaneous events. In other words, crossing data streams is a powerful thing.
Example code
The following code example demonstrates how Deephaven takes three different data sources and combines them...
# import a kafka stream
from deephaven import kafka_consumer as ck
def get_trades_stream():
return ck.consumeToTable(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' },
'io.deephaven.crypto.kafka.TradesTopic',
key = ck.IGNORE,
value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=ck.ALL_PARTITIONS_SEEK_TO_END,
table_type='append')
crypto_stream = get_trades_stream().view(formulas = ["Timestamp", "Exchange", "Instrument", "Price", "Size"])
# import a CSV
from deephaven import read_csv
crypto_0922 = read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/CryptoCurrencyHistory/CSV/CryptoTrades_20210922.csv")\
.view(formulas = ["Timestamp", "Exchange", "Instrument", "Price", "Size"])
# read Parquet where it is
from deephaven import ParquetTools as pt
crypto_0923 = pt.read_table("/data/large/crypto/CryptoTrades_20210923.snappy.parquet")\
.view(formulas = ["Timestamp", "Exchange", "Instrument", "Price", "Size"])
# Combine streams, here Parquet and Kafka
from deephaven import merge
merge_3_and_agg = merge(crypto_stream, crypto_0922)\
.view(formulas = ["Instrument", "Exchange", "Trade_Count = 1", "Base_Tot_Value = Price * Size"])\
.sum_by(by = ["Instrument", "Exchange"])
from deephaven.DateTimeUtils import formatDate
# Perform operations on combined streams, here CSV with Parquet and Kafka
join_3_tables = crypto_stream.update_view(formulas = ["Date = formatDate(Timestamp, TZ_NY)"]).countBy("Trade_Count", "Date", "Exchange", "Instrument")\
.natural_join(table = crypto_0922.countBy(col="Trade_Count", by = ["Exchange", "Instrument"]), on =["Exchange, Instrument"], joins = ["Trade_Count_0922 = Trade_Count"])\
.natural_join(table = crypto_0923.countBy(col="Trade_Count", by = ["Exchange, joins = Instrument"]), on =["Exchange, Instrument"], joins = ["Trade_Count_0923 = Trade_Count"])