Deephaven Community Tutorial
This tutorial assumes you have already installed Deephaven and you are running the web console locally. After you start Deephaven, navigate in a browser to localhost:10000 in order to run the commands below. If you haven't installed Deephaven yet, you can follow our quickstart for installation instructions.
1. Ingest static data
Deephaven empowers you to work with both batch and streaming data using the same methods.
It supports ingesting data from CSVs and other delimited files, and reading Parquet files at rest. [Soon you’ll be able to read XML, access SQL databases via ODBC, and access Arrow buffers locally and via Flight.]
CSV ingestion is described in detail in our guide, How to import CSV files. Run the commmand below inside a Deephaven console for an example of ingesting weather data from a CSV-URL.
from deephaven import read_csv
# read_csv will accept a url or a path to a local file
seattle_weather = read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/GSOD/csv/seattle.csv")
- seattle_weather
Using the seattle_weather
example table, the next command will calculate average, low, and high temperatures by year.
Most of the script is simply to whet your appetite.
from deephaven.time import to_j_time_zone, to_j_instant, dh_now
from deephaven import agg
today = dh_now()
tz = to_j_time_zone("ET")
hi_lo_by_year = seattle_weather.view(formulas=["Year = (int)year(today, tz)", "TemperatureF"])\
.where(filters=["Year >= 2000"])\
.agg_by([\
agg.avg(cols=["Avg_Temp = TemperatureF"]),\
agg.min_(cols=["Lo_Temp = TemperatureF"]),\
agg.max_(cols=["Hi_Temp = TemperatureF"])
],\
by=["Year"])
- hi_lo_by_year
To read data from a local or networked file system, simply specify the file path. The file path below assumes you downloaded pre-built Docker images that include Deephaven’s example data, as described in our Quick start. If you are not using the docker container with example data, you can download the example files directly from our github example repo.
This script accesses a million-row CSV of crypto trades from 09/22/2021.
from deephaven import read_csv
crypto_from_csv = read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/CryptoCurrencyHistory/CSV/CryptoTrades_20210922.csv")
- crypto_from_csv
This importer provides a variety of capabilities related to .txt
, .psv
, and other delimited files. The related Javadoc describes these usage patterns.
The table widget now in view is designed to be highly interactive:
- Touch the table and filter via Ctrl + F (Windows) or ⌘ + F (Mac).
- Touch the funnel icon to create sophisticated filters or use auto-filter UI features.
- Hover on headers to see data types.
- Click headers to access more options, like adding or changing sorts.
- Click the Table Options menu at right to plot from the UI, create and manage columns, download CSVs.
In addition to CSV, Deephaven supports reading Parquet files. Parquet files can be accessed via random access and therefore need not be read completely into memory. See our documentation about reading both single, flat files and multiple, partitioned Parquet files.
from deephaven.parquet import read
crypto_from_parquet = read("/data/examples/CryptoCurrencyHistory/Parquet/CryptoTrades_20210922.parquet")
2. Ingest real-time streams
Providing you the ability to work with dynamic, updating, and real-time data is Deephaven’s superpower.
Users connect Kafka and other event streams, integrate enterprise and vendor data-source APIs and feeds, receive JSON from devices, and integrate with Change Data Capture (CDC) exhaust from RDBMSs.
Deephaven has a rich Kafka integration, supporting AVRO, JSON, dictionaries, and dynamics for historical, stream, and append tables. Our concept piece guide, Kafka in Deephaven, illuminates the ease and value.
Though there is much sophistication available, the basic syntax for Kafka integration is:
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
result = ck.consume({'bootstrap.servers': 'server_producer:port'}, 'kafka.topic')
The code above is generic. All the available options to properly explore your local configuration to be able to hook up a Kafka feed are beyond the scope of this tutorial. With Deephaven, creating real-time tables can be as easy as you want it to be.
The following code takes fake historical crypto trade data from a CSV file at a URL and replays it in real time based on timestamps. This is only one of multiple ways to create real-time data in just a few lines of code. Replaying historical data is a great way to test real-time algorithms before deployment into production.
from deephaven import TableReplayer, read_csv
from deephaven import time as dhtu
fake_crypto_data = read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/CryptoCurrencyHistory/CSV/FakeCryptoTrades_20230209.csv")
start_time = dhtu.to_j_instant("2023-02-09T12:09:18 ET")
end_time = dhtu.to_j_instant("2023-02-09T12:58:09 ET")
replayer = TableReplayer(start_time, end_time)
crypto_streaming = replayer.add_table(fake_crypto_data, "Timestamp")
replayer.start()
It’s nice to watch new data hit the screen. Let's reverse the table so the newest trades appear at the top.
crypto_streaming_2 = crypto_streaming.reverse()
This can also be done without a query. Click on a column header in the UI and choose Reverse Table.
Now that you have a few tables, the next section will introduce adding new columns to them and merging.
3. Create columns and merge tables
Let's examine the data a bit programmatically. Use count_by
to see the row-count of the tables, respectively.
Table operations, methods, and other capabilities of the Deephaven table API are used identically for updating (streaming) tables and static ones!
This simple example illustrates this superpower:
row_count_from_csv = crypto_from_csv.count_by(col="Row_Count").update_view(formulas=["Source = `CSV`"])
row_count_from_parquet = crypto_from_parquet.count_by(col="Row_Count").update_view(formulas=["Source = `Parquet`"])
row_count_streaming = crypto_streaming.count_by(col="Row_Count").update_view(formulas=["Source = `Streaming`"])
- row_count_from_csv
- row_count_from_parquet
- row_count_streaming
You can eyeball the respective row counts easily by merging the tables. In the future, if you need to merge, then sort a table, using merge_sorted
is recommended, as it is more efficient.
from deephaven import merge
row_count_compare = merge([row_count_from_csv, row_count_from_parquet, row_count_streaming])
Explore the schema and other metadata using meta_table
.
meta_from_csv = crypto_from_csv.meta_table.update_view(formulas=["Source = `CSV`"]).move_columns_up(cols=["Source"])
meta_from_parquet = crypto_from_parquet.meta_table.update_view(formulas=["Source = `Parquet`"]).move_columns_up(cols=["Source"])
meta_streaming = crypto_streaming.meta_table.update_view(formulas=["Source = `Streaming`"]).move_columns_up(cols=["Source", "Name"])
- meta_from_csv
- meta_from_parquet
- meta_streaming
Merging and sorting the metadata tables will highlight some differences in the respective schema.
merge_meta = merge([meta_from_csv, meta_from_parquet, meta_streaming])\
.sort(order_by=["Name", "Source"])
- merge_meta
You can cast the three columns that are different in the crypto_streaming
table using update
, which is one of the five selection and projection operations available to you, as described in our guide, How to select, view, and update data.
crypto_streaming_3 = crypto_streaming.update(formulas=["Instrument = String.valueOf(Instrument)",
"Exchange = String.valueOf(Exchange)"])
Now that the schemas are identical across the three tables, let’s create one table of crypto data that has both updating and static data - the latter assembled using head_pct
and tail_pct
. The last two lines remove the legacy static tables.
crypto_main = merge([crypto_from_csv.head_pct(0.50), crypto_from_parquet.tail_pct(0.50), crypto_streaming_3])\
.sort_descending(order_by=["Timestamp"])
crypto_from_csv = None
crypto_from_parquet = None
In the next section, you’ll learn about adding new columns to support calculations and logic, and doing aggregations.
4. Manipulate and aggregate data
It's likely you've figured out a few of Deephaven’s fundamentals:
- You name tables and operate on them. Everything in Deephaven is a table. Streams are updating tables. Batches are static ones. You don't have to track this.
- You apply methods to these tables and can be blind about whether the data is updating or not.
- You can refer to other named tables, and data simply flows from tables to its dependents. You may know this as an acyclic graph. (See our concept guide on the table update model if you're interested in what's under-the-hood.)
- There is no optimizer to wrestle with. You’ll appreciate this once you tackle complex use cases or need to bring your Python, Java, or wrapped C++ code to the data.
Aggregations are an important use case for streaming data. (And static, too.)
Doing a single, dedicated aggregation, like the sum_by
below, follows a pattern similar to the count_by
you did earlier.
crypto_sum_by = crypto_main.view(formulas=["Instrument", "Exchange", "Trade_Count = 1", "Total_Base_Value = Price * Size"])\
.sum_by(by=["Instrument", "Exchange"])\
.sort_descending(order_by=["Trade_Count"])
You can also add columns with Manage Custom Columns in the Table Options menu in the web UI.
If your use case is well served by adding columns in formulaic, on-demand fashion (instead of writing results to memory), use update_view
. In this case, you’ll calculate the value of each trade and mod-10 the Id field.
Binning data is fundamental and is intended to be easy via upperBin
and lowerBin
. This is heavily used in profiling and sampling data.
The query below reuses the same table name (crypto_main
). That’s just fine.
from deephaven.time import to_j_zdt
to_zdt = to_j_zdt("2023-08-22T12:00:00 ET")
crypto_main = crypto_main.update_view(formulas=["Value_Base_Ccy = (Price * Size)",\
"TimeBin = upperBin(to_zdt, 1)"])
View distinct values using select_distinct
.
# 1 column
distinct_instruments = crypto_main.select_distinct(formulas=["Instrument"]).sort(order_by=["Instrument"])
# 2 columns
instrument_exchange = crypto_main.select_distinct(formulas=["Exchange", "Instrument"]).sort(order_by=["Exchange", "Instrument"])
# count_by looks similar
count_by = crypto_main.count_by("Trade_Count", by=["Exchange", "Instrument"]).sort(order_by=["Exchange", "Instrument"])
You can also accomplish this with Select Distinct Values in the Table Options menu in the web UI.
Performing multiple aggregations simultaneously may prove logical and helpful to performance.
Let's define an aggregation function to be used later. The function will return an aggregation result based on the table and aggregation-keys you pass in.
from deephaven import agg
def aggregate_crypto(table, agg_keys):
agg_list = [
agg.first(cols=["Last_Timestamp = Timestamp"]),\
agg.sum_(cols=["Total_Value_Traded = Value_Base_Ccy", "Total_Size = Size"]),\
agg.count_(col="Trade_Count"),\
agg.weighted_avg(wcol="Size", cols=["Wtd_Avg_Price = Price"]),\
agg.min_(cols=["Low_Price = Price"]),\
agg.max_(cols=["Hi_Price = Price"])
]
return table.agg_by(agg_list, agg_keys)
Below, you equip the aggregate_crypto
with different numbers and versions of keys. The last table has some extra polish to make the resulting table more valuable to the eye.
# 1 key
agg_1_key = aggregate_crypto(crypto_main, ["TimeBin"])
# 2 keys
agg_2_keys = aggregate_crypto(crypto_main, ["Exchange", "Instrument"])\
.sort(order_by=["Exchange", "Instrument"])
#3 keys
agg_3_keys = aggregate_crypto(crypto_main, ["Exchange", "Instrument", "TimeBin"])\
.sort_descending(order_by=["Last_Timestamp"])\
.update_view(formulas=["Total_Value_Traded = (int)Total_Value_Traded", "Total_Size = (long)Total_Size"])\
.move_columns_up(cols=["Instrument", "Trade_Count", "Total_Value_Traded"])
5. Filter, join, and as-of-join
Deephaven filtering is accomplished by applying where
operations. The engine supports a large array of match, conditional, and combination filters.
These four scripts are simple examples.
filter_btc = crypto_main.where(filters=["Instrument = `BTC/USD`"])
filter_eth = crypto_main.where(filters=["Instrument = `ETH/USD`"])
filter_eth_and_price = crypto_main.where(filters=["Instrument = `ETH/USD`", "Size > 1"])
filter_eth_and_exchange = crypto_main.where(filters=["Instrument = `ETH/USD`", "Exchange.startsWith(`bi`) = true"])
Use where_in
to filter one table based on the contents of another "filter table". If the filter table updates, the filter applied to the other changes automatically.
In the third line below, you’ll filter the table crypto_main
based on the Instrument
values of the table row_1
.
agg_by_instrument = aggregate_crypto(crypto_main, ["Instrument"])
row_1 = agg_by_instrument.head(1)
filter_where_in = crypto_main.where_in(filter_table=row_1, cols=["Instrument"])
If you prefer, you can set variables using records from tables.
These lines, in combination, will print the record in the first index position (2nd row) of the Instrument
column in the agg_by_instrument
table to your console.
from deephaven.numpy import to_numpy
# i2 = to_numpy(agg_by_instrument, "Instrument")[0][0]
instrument_2nd = agg_by_instrument.j_object.getColumnSource("Instrument").get(1)
# i3 = agg_by_instrument.j_object.getColumnSource("Instrument")
print(instrument_2nd)
- Log
That variable can be used for filtering.
filter_variable = crypto_main.where(filters=["Instrument = instrument_2nd"])
Deephaven joins are first class, supporting joining real-time, updating tables with each other (and with static tables) without any need for windowing.
Our guide, Choose a join method, offers guidance on how to choose the best method for your use case.
Generally, joins fall into one of two categories:
- Time series joins:
aj
(as-of-join) andraj
(reverse-as-of-join). - Relational joins:
natural_join
,join
, andexact_join
.
The syntax is generally as follows:
result1 = left_table.join_method(right_table, "Key", "Column_From_Right_Table")
Or, with multiple keys and columns:
result1 = left_table.join_method(right_table, "Key1, Key2, KeyN", "Column1_From_Right, Column2_From_Right, ColumnN_From_Right")
You can rename columns as you join tables as well.
Use natural_join
when you expect no more than one match in the right table per key, and are happy to receive null records as part of the join process.
# filter the aggregation tables to create a table for BTC and ETH
trade_count_btc = agg_2_keys.where(filters=["Instrument = `BTC/USD`"]).view(formulas=["Exchange", "Trade_Count"])
trade_count_eth = agg_2_keys.where(filters=["Instrument = `ETH/USD`"]).view(formulas=["Exchange", "Trade_Count"])
# natural_join() using "Exchange" as the join-key
# pull column "Trade_Count" from the *_eth table, renaming it "Trade_Count_Eth"
join_2_tables = trade_count_btc.rename_columns(cols=["Trade_Count_Btc = Trade_Count"])\
.natural_join(table=trade_count_eth, on=["Exchange"], joins=["Trade_Count_Eth = Trade_Count"])
Though Deephaven excels with relational joins, its ordering capabilities make it an excellent time series database.
Time series joins, or “as-of joins”, take a timestamp key from the left table and do a binary search in the right table (respecting other join keys) seeking an exact timestamp-nanosecond match. If no match exists, the timestamp just prior to the join-timestamp establishes the match target.
It is important to note:
- The right table needs to be sorted.
- Numerical fields other than date-times can also be used for the final key in as-of joins.
- Reverse-as-of join is similar, but uses the record just after the target timestamp if no exact match is found.
- One can syntactically use
<
or>
(instead of=
) in the query to eliminate the exact match as the best candidate.
# filter the original crypto_main table to get raw BTC and ETH trade records
crypto_btc = crypto_main.where(filters=["Instrument = `BTC/USD`"])
crypto_eth = crypto_main.where(filters=["Instrument = `ETH/USD`"])
# for each record of the right table
time_series_join = crypto_btc.view(formulas=["Timestamp", "Price"])\
.aj(table=crypto_eth, on=["Timestamp"], joins=["Eth_Time = Timestamp", "Price_Eth = Price"])\
.rename_columns(cols=["Time_Btc = Timestamp", "Price_Btc = Price"])
The introduction of Exchange
as a join-key in front of Timestamp in the script below directs the engine to do the as-of-join after first doing an exact match on Exchange
between the left and right tables.
time_series_join_2_keys = crypto_btc.view(formulas=["Exchange", "Time_Btc = Timestamp", "Price_Btc = Price"])\
.aj(table=crypto_eth, on=["Exchange", "Time_Btc >= Timestamp"], joins=["Eth_Time = Timestamp", "Eth_Price = Price"])
People often use aj
to join records that are shifted by a time phase.
# Create a column to represent time 1 minute before the "Timestamp".
crypto_btc_simple = crypto_btc.view(formulas=["Time_Trade = Timestamp", "Time_less_1min = Time_Trade - MINUTE", "Price_Now = Price"])
# As-of join the table on itself.
time_series_join_self = crypto_btc_simple.aj(table=crypto_btc_simple, on=["Time_Trade >= Time_less_1min"], joins=["Price_1_min_prev = Price_Now"])\
.update_view(formulas=["Price_Diff = Price_Now - Price_1_min_prev"])
6. Plot data via query or the UI
Deephaven has a rich plotting API that support updating, real-time plots. It can be called programmatically or via JS integrations in the web UI. It integrates with the open-source plotly library. The suite of plots will continue to grow, with the Deephaven community setting the priorities.
Try these basic examples:
from deephaven.plot.figure import Figure
#from deephaven import time as dhtu
crypto_btc_2021 = crypto_btc.update_view(["Year = (int)year(Timestamp, timeZone(`ET`))"]).where(["Year == 2021"]).drop_columns(["Year"])
# simple line plot
simple_line_plot = Figure().plot_xy(series_name="BTC Price", t=crypto_btc_2021.tail(2_000), x="Timestamp", y="Price").show()
# two series, two axes
line_plot = Figure().plot_xy(series_name="BTC Size", t=crypto_btc_2021.tail(2_000), x="Timestamp", y="Size").line(color="HONEYDEW")\
.x_twin()\
.plot_xy(series_name="BTC Price", t=crypto_btc_2021.tail(2_000), x="Timestamp", y="Price").line(color="RED")\
.show()
#scatter plot
scatter_plot = Figure().plot_xy(series_name="ETH vs. BTC", t=time_series_join.reverse().tail(100), x="Price_Btc", y="Price_Eth")\
.axes(plot_style="scatter")\
.show()
- crypto_btc_2021
- simple_line_plot
- line_plot
- scatter_plot
You can also make simple plots like these using the Chart Builder in the UI. Open the Table Options menu at the table's right. After choosing a chart type, you can configure the relevant options. Below, we create a similar plot to simple_line_plot
in the query above.
7. Export data to popular formats
It's easy to export your data out of Deephaven to popular open formats.
To export our final, joined table to a CSV file, simply use the write_csv
method with table name and the location to which you want to save the file.
from deephaven import write_csv
write_csv(time_series_join_self, "/data/time_series_join_self.csv")
If the table is dynamically updating, Deephaven will automatically snapshot the data before writing it to the file.
Similarly, for Parquet:
from deephaven.parquet import write
write(time_series_join_self, "/data/time_series_join_self.parquet")
To create a static Pandas DataFrame, use the to_pandas
method.
from deephaven import pandas as dhpd
data_frame = dhpd.to_pandas(time_series_join_self)
print(data_frame)
- Log
8. What to do next
Now that you've imported data, created tables, and manipulated static and real-time data, take a look at our full set of how-to guides. We recommend beginning with the user interface section to learn about the features built-in to the IDE: