Skip to main content

Learn Deephaven Community tutorial

1. Ingest static data#

Deephaven empowers you to work with both batch and streaming data using the same methods.

It supports ingesting data from CSVs and other delimited files, and reading Parquet files at rest. [Soon you’ll be able to read XML, access SQL databases via ODBC, and access Arrow buffers locally and via Flight.]

CSV ingestion is described in detail in our guide, How to import CSV files. The basic syntax is:

from deephaven.TableTools import readCsvnamed_table = readCsv("https://datasource.com/yourfilenamehere.csv")

Here is an introductory program that ingests weather data from a CSV-url, then calculates average, low, and high temperatures by year.

Most of the script is simply to whet your appetite.

from deephaven.TableTools import readCsv
seattle_weather = readCsv("https://media.githubusercontent.com/media/deephaven/examples/main/GSOD/csv/seattle.csv")
from deephaven.DBTimeUtils import yearfrom deephaven import ComboAggregateFactory as caf
hi_lo_by_year = seattle_weather.view("Year = yearNy(ObservationDate)", "TemperatureF")\    .where("Year >= 2000")\    .by(caf.AggCombo(caf.AggAvg("Avg_Temp = TemperatureF"), caf.AggMin("Lo_Temp = TemperatureF"), caf.AggMax("Hi_Temp = TemperatureF")), "Year")\    .formatColumns("Year = Decimal(`#`)")

To read data from a local or networked file system, simply specify the file path. The file path below assumes you downloaded pre-built Docker images that include Deephaven’s example data, as described in our Quick start.

This script accesses a million-row CSV of crypto trades from 09/22/2021.

crypto_from_csv = readCsv("/data/examples/CryptoCurrencyHistory/CryptoTrades_20210922.csv")

This importer provides a variety of capabilities related to .txt, .psv, and other delimited files. The related Javadocs describe these usage patterns.

The table widget now in view is designed to be highly interactive:

  • Touch the table and filter via Ctrl + F (Windows) or + F (Mac).
  • Touch the funnel icon to create sophisticated filters or use auto-filter UI features.
  • Hover on headers to see data types.
  • Click headers to access more options, like adding or changing sorts.
  • Click the Table Options menu at right to plot from the UI, create and manage columns, download CSVs.

In addition to CSV, Deephaven supports reading Parquet files. Parquet files can be accessed via random access and therefore need not be read completely into memory. See our documentation about reading both single, flat files and multiple, partitioned Parquet files.

from deephaven.ParquetTools import readTable
crypto_from_parquet = readTable("/data/examples/CryptoCurrencyHistory/CryptoTrades_20210922.parquet")

2. Ingest real-time streams#

Providing you the ability to work with dynamic, updating, and real-time data is Deephaven’s superpower.

Users connect Kafka and other event streams, integrate enterprise and vendor data-source APIs and feeds, receive JSON from devices, and [soon] integrate with Change Data Capture (CDC) exhaust from RDBMSs.

Deephaven has a rich Kafka integration, supporting AVRO, JSON, dictionaries, and dynamics for historical, stream, and append tables. Our concept piece guide, Kafka in Deephaven, illuminates the ease and value.

Though there is much sophistication available, the basic syntax for Kafka integration is:

from deephaven import KafkaTools as kt
result = kt.consumeToTable({'bootstrap.servers': server_producer:port'}, 'kafka.topic')

The code above is generic. All the available options to properly explore your local configuration to be able to hook up a Kafka feed are beyond the scope of this tutorial.

The following script creates a fake appending table of hypothetical crypto trades in a few instruments on a few exchanges. The data is fake, so you’ll notice trade event intervals are more uniform and sizes a bit atypical of the market.

New records are added every 25 milliseconds, back-populated to 30 minutes prior to the moment you run the script.

from deephaven.tutorials.crypto_trades import *
crypto_streaming = ticking_crypto_milliseconds(25)

It’s fun to watch new data hit the screen, so let’s reverse the table.

crypto_streaming_2 = crypto_streaming.reverse()

Now that you have a few tables, the next section will introduce adding new columns to them and merging.

3. Create columns and merge tables#

Let's examine the data a bit programmatically. Use countBy to see the row-count of the tables, respectively.

Table operations, methods, and other capabilities of the Deephaven table API are used identically for updating (streaming) tables and static ones!

This simple example illustrates this superpower:

row_count_from_csv = crypto_from_csv.countBy("Row_Count").updateView("Source = `CSV`")row_count_from_parquet = crypto_from_parquet.countBy("Row_Count").updateView("Source = `Parquet`")row_count_streaming = crypto_streaming.countBy("Row_Count").updateView("Source = `Streaming`")

You can eyeball the respective row counts easily by merging the tables. In the future, if you need to merge, then sort a table, using mergeSorted is recommended, as it is more efficient.

from deephaven.TableTools import merge
row_count_compare = merge(row_count_from_csv, row_count_from_parquet, row_count_streaming)

Explore the schema and other metadata using getMeta.

meta_from_csv = crypto_from_csv.getMeta().updateView("Source = `CSV`").moveUpColumns("Source")meta_from_parquet = crypto_from_parquet.getMeta().updateView("Source = `Parquet`").moveUpColumns("Source")meta_streaming = crypto_streaming.getMeta().updateView("Source = `Streaming`").moveUpColumns("Name", "Source")

Merging and sorting the metadata tables will highlight some differences in the respective schema.

merge_meta = merge(meta_from_csv, meta_from_parquet, meta_streaming)\    .sort("Name", "Source")\    .formatColumnWhere("DataType", "DataType.contains(`Object`) = true", "BLUE")\    .formatColumnWhere("Name", "DataType.contains(`int`) = true", "YELLOW")\    .formatColumnWhere("ColumnType", "DataType.contains(`int`) = true", "YELLOW")

You can cast the three columns that are different in the crypto_streaming table using update, which is one of the five selection and projection operations available to you, as described in our guide, How to select, view, and update data.

crypto_streaming_3 = crypto_streaming.update("Id = (long)Id",\    "Instrument = Instrument.toString()","Exchange = Exchange.toString()")

Now that the schemas are identical across the three tables, let’s create one table of crypto data that has both updating and static data - the latter assembled using headPct and tailPct. The last two lines remove the legacy static tables.

crypto_main = merge(crypto_from_csv.headPct(0.50), crypto_from_parquet.tailPct(0.50),\    crypto_streaming_3)\    .sortDescending("Timestamp")crypto_from_csv = Nonecrypto_from_parquet = None

In the next section, you’ll learn about adding new columns to support calculations and logic, and doing aggregations.

4. Manipulate and aggregate data#

It's likely you've figured out a few of Deephaven’s fundamentals:

  • You name tables and operate on them. Everything in Deephaven is a table. Streams are updating tables. Batches are static ones. You don't have to track this.
  • You apply methods to these tables and can be blind about whether the data is updating or not.
  • You can refer to other named tables, and data simply flows from tables to its dependents. You may know this as an acyclic graph. (See our concept guide on the table update model if you're interested in what's under-the-hood.)
  • There is no optimizer to wrestle with. You’ll appreciate this once you tackle complex use cases or need to bring your Python, Java, or wrapped C++ code to the data.

Aggregations are an important use case for streaming data. (And static, too.) Doing a single, dedicated aggregation, like the sumBy below, follows a pattern similar to the countBy you did earlier.

crypto_sumBy = crypto_main.view("Instrument", "Exchange", "Trade_Count = 1", "Total_Base_Value = Price * Size")\    .sumBy("Instrument", "Exchange")\    .sortDescending("Trade_Count")\    .formatColumns("Total_Base_Value = Decimal(`#,###.00`)")

If your use case is well served by adding columns in formulaic, on-demand fashion (instead of writing results to memory), use updateView. In this case, you’ll calculate the value of each trade and mod-10 the Id field.

tip

You can also add columns with Manage Custom Columns in the Table Options menu in the web UI.

Binning data is fundamental and is intended to be easy via upperBin and lowerBin. This is heavily used in profiling and sampling data.

The query below reuses the same table name (crypto_main). That’s just fine.

crypto_main = crypto_main.updateView("Value_Base_Ccy = (Price * Size)",\    "Id_Decile = Id%10",    "TimeBin = upperBin(Timestamp, MINUTE)")\    .formatColumns("Value_Base_Ccy = Decimal(`$ #,###.00`)")

View distinct values using selectDistinct.

# 1 columndistinct_instruments = crypto_main.selectDistinct("Instrument").sort("Instrument")# 2 columnsinstrument_exchange = crypto_main.selectDistinct("Exchange", "Instrument").sort("Exchange", "Instrument")# CountBy() looks similarcount_by = crypto_main.countBy("Trade_Count", "Exchange", "Instrument").sort("Exchange", "Instrument")
tip

You can also accomplish this with Select Distinct Values in the Table Options menu in the web UI.

Performing multiple aggregations simultaneously may prove logical and helpful to performance.

Let's define an aggregation function to be used later. The function will return an aggregation result based on the table and aggregation-keys you pass in.

from deephaven import ComboAggregateFactory as caf
def aggregate_crypto(table, agg_keys):    return table.by(caf.AggCombo(\    caf.AggFirst("Last_Timestamp = Timestamp"),\    caf.AggSum("Total_Value_Traded = Value_Base_Ccy", "Total_Size = Size"),\    caf.AggCount("Trade_Count"),\    caf.AggWAvg("Size", "Wtd_Avg_Price = Price"),\    caf.AggMin("Low_Price = Price"),\    caf.AggMax("Hi_Price = Price")),\    agg_keys)

Below, you equip the aggregate_crypto with different numbers and versions of keys. The last table has some extra polish to make the resulting table more valuable to the eye.

# 1 keyagg_1_key = aggregate_crypto(crypto_main, "Date")
# 2 keysagg_2_keys = aggregate_crypto(crypto_main, ["Exchange", "Instrument"])\    .sort("Exchange", "Instrument")\    .formatColumnWhere("Instrument", "Instrument = `BTC/USD`", "DODGERBLUE")
#3 keysagg_3_keys = aggregate_crypto(crypto_main, ["Exchange", "Instrument", "Id_Decile", "TimeBin"])\    .sortDescending("Last_Timestamp")\    .updateView("Total_Value_Traded = (int)Total_Value_Traded", "Total_Size = (long)Total_Size")\    .formatColumns("Wtd_Avg_Price = Decimal(`#,###.00`)", "Low_Price = Decimal(`#,###.00`)", "Hi_Price = Decimal(`#,###.00`)",\    "Trade_Count = heatmap(Trade_Count, 1, 20, MAGENTA, CYAN)")\    .moveUpColumns("TimeBin", "Instrument", "Id_Decile", "Trade_Count", "Total_Value_Traded")

5. Filter, join, and as-of-join#

Deephaven filtering is accomplished by applying where operations. The engine supports a large array of match, conditional, and combination filters.

These four scripts are simple examples.

filter_btc = crypto_main.where("Instrument = `BTC/USD`")filter_eth = crypto_main.where("Instrument = `ETH/USD`")filter_eth_and_price = crypto_main.where("Instrument = `ETH/USD`", "Size > 1")filter_eth_and_exchange = crypto_main.where("Instrument = `ETH/USD`", "Exchange.startsWith(`bi`) = true")

Use whereIn to filter one table based on the contents of another "filter table". If the filter table updates, the filter applied to the other changes automatically.

In the third line below, you’ll filter the table crypto_main based on the Instrument values of the table row_1.

agg_by_instrument = aggregate_crypto(crypto_main, "Instrument")row_1 = agg_by_instrument.head(1)filter_where_in = crypto_main.whereIn(row_1, "Instrument")

If you prefer, you can set variables using records from tables.

These lines, in combination, will print the record in the first index position (2nd row) of the Instrumentcolumn in the agg_by_instrument table to your console.

instrument_2nd = agg_by_instrument.getColumn("Instrument").get(1)print(instrument_2nd)
BTC/USD

That variable can be used for filtering.

filter_variable = crypto_main.where("Instrument = instrument_2nd")

Deephaven joins are first class, supporting joining real-time, updating tables with each other (and with static tables) without any need for windowing.

Our guide, Choose a join method, offers guidance on how to choose the best method for your use case.

Generally, joins fall into one of two categories:

The syntax is generally as follows:

result1 = left_table.joinMethod(right_table, "Key", "Column_from_right_table")

Or, with multiple keys and columns:

result1 = left_table.joinMethod(right_table, "Key1, Key2, KeyN", "Column1_from_right, Column2_from_right, ColumnN_from_right")

You can rename columns as you join tables as well.

Use naturalJoin() when you expect no more than one match in the right table per key, and are happy to receive null records as part of the join process.

# filter the aggregation tables to create a table for BTC and ETHtrade_count_btc = agg_2_keys.where("Instrument = `BTC/USD`").view("Exchange", "Trade_Count")trade_count_eth = agg_2_keys.where("Instrument = `ETH/USD`").view("Exchange", "Trade_Count")
# naturalJoin() using "Exchange" as the join-key# pull column "Trade_Count" from the *_eth table, renaming it "Trade_Count_Eth"join_2_tables = trade_count_btc.renameColumns("Trade_Count_Btc = Trade_Count")\    .naturalJoin(trade_count_eth, "Exchange", "Trade_Count_Eth = Trade_Count")

Though Deephaven excels with relational joins, its ordering capabilities make it an excellent time series database.

Time series joins, or “as-of joins”, take a timestamp key from the left table and do a binary search in the right table (respecting other join keys) seeking an exact timestamp-nanosecond match. If no match exists, the timestamp just prior to the join-timestamp establishes the match target.

It is important to note:

  • The right table needs to be sorted.
  • Numerical fields other than date-times can also be used for the final key in as-of joins.
  • Reverse-as-of join is similar, but uses the record just after the target timestamp if no exact match is found.
  • One can syntactically use < or > (instead of =) in the query to eliminate the exact match as the best candidate.
# filter the original crypto_main table to get raw BTC and ETH trade recordscrypto_btc = crypto_main.where("Instrument = `BTC/USD`")crypto_eth = crypto_main.where("Instrument = `ETH/USD`")
# for each record of the right tabletime_series_join = crypto_btc.view("Timestamp", "Price")\    .aj(crypto_eth, "Timestamp", "Eth_Time = Timestamp, Price_Eth = Price")\    .renameColumns("Time_Btc = Timestamp", "Price_Btc = Price")

The introduction of Exchange as a join-key in front of Timestamp in the script below directs the engine to do the as-of-join after first doing an exact match on Exchange between the left and right tables.

time_series_join_2_keys = crypto_btc.view("Exchange", "Time_Btc = Timestamp", "Price_Btc = Price")\    .aj(crypto_eth, "Exchange, Time_Btc= Timestamp", "Eth_Time = Timestamp, Eth_Price = Price")\

People often use aj to join records that are shifted by a time phase.

# Create a column to represent time 1 minute before the "Timestamp".crypto_btc_simple = crypto_btc.view("Time_Trade = Timestamp", "Time_less_1min = Time_Trade - MINUTE", "Price_Now = Price")
# As-of join the table on itself.time_series_join_self = crypto_btc_simple.aj(crypto_btc_simple, "Time_Trade = Time_less_1min", "Price_1_min_prev = Price_Now")\    .updateView("Price_Diff = Price_Now - Price_1_min_prev")

6. Plot data via query or the UI#

Deephaven has a rich plotting API that support updating, real-time plots. It can be called programmatically or via JS integrations in the web UI. It integrates with the open-source plotly library. The suite of plots will continue to grow, with the Deephaven community setting the priorities.

Try these basic examples:

from deephaven import Plot
today_btc = crypto_btc.where("Date = currentDateNy()").reverse()
# simple line plotsimple_line_plot = Plot.plot("BTC Price", today_btc, "Timestamp", "Price").show()
# two series, two axesline_plot = Plot.plot("BTC Size", today_btc, "Timestamp", "Size").lineColor("HONEYDEW")\    .twinX()\    .plot("BTC Price", today_btc, "Timestamp", "Price").lineColor("RED")\    .show()
#scatter plotscatter_plot = Plot.plot("ETH vs. BTC", time_series_join.reverse().tail(100), "Price_Btc", "Price_Eth")\    .plotStyle("scatter")\    .show()

You can also make simple plots like these using the Chart Builder in the UI. Open the Table Options menu at the table's right. After choosing a chart type, you can configure the relevant options. Below, we create the same simple_line_plot as in our query above.