Skip to main content
Version: Java (Groovy)

Deephaven Community Core Quickstart

1. Install with one command

Install and launch Deephaven via Docker with a one-line command:

docker run --rm --name deephaven -p 10000:10000 --env START_OPTS=-Dauthentication.psk=YOUR_PASSWORD_HERE ghcr.io/deephaven/server-slim:latest

For security it is important to replace "YOUR_PASSWORD_HERE" with a more secure passkey. The above command requires Docker to already be installed. For advanced installation options, see our install guide for Docker.

2. Access the Deephaven Community front-end UI

Now, navigate to http://localhost:10000/ and enter the password you set above in the token field. You're up and running! Now what?

img

3. Work with live and batch data

Deephaven empowers you to work with both batch and streaming data using the same methods.

It supports ingesting data from CSVs, Parquet files, and Kafka streams.

Access data from a CSV

Run the commmand below inside a Deephaven console for an example of ingesting a million-row CSV of crypto trades. All you need is a stable URL for the data.

import io.deephaven.csv.CsvTools

cryptoFromCsv = CsvTools.readCsv("https://media.githubusercontent.com/media/deephaven/examples/main/CryptoCurrencyHistory/CSV/CryptoTrades_20210922.csv")

The table widget now in view is highly interactive:

  • Click on a table and press Ctrl + F (Windows) or ⌘F (Mac) to open quick filters.
  • Click the funnel icon in the filter field to create sophisticated filters or use auto-filter UI features.
  • Hover over column headers to see data types.
  • Right-click headers to access more options, like adding or changing sorts.
  • Click the Table Options hamburger menu at right to plot from the UI, create and manage columns, download CSVs.

Ingest real-time streams

Providing you the ability to work with dynamic, updating, and real-time data is Deephaven’s superpower.

Deephaven has a rich Kafka integration, supporting AVRO, JSON, Protobuf, dictionaries, and dynamics for historical, stream, and append tables. Users connect Kafka and other event streams, integrate enterprise and vendor data-source APIs and feeds, receive JSON from devices, and integrate with Change Data Capture (CDC) exhaust from RDBMSs.

The following code takes fake historical crypto trade data from a CSV file at a URL and replays it in real time based on timestamps. This is only one of multiple ways to create real-time data in just a few lines of code. Replaying historical data is a great way to test real-time algorithms before deployment into production.

import io.deephaven.engine.table.impl.replay.Replayer

fakeCryptoData = CsvTools.readCsv("https://media.githubusercontent.com/media/deephaven/examples/main/CryptoCurrencyHistory/CSV/FakeCryptoTrades_20230209.csv")

start = parseInstant("2023-02-09T12:09:18 ET")
end = parseInstant("2023-02-09T12:58:09 ET")

replayer = new Replayer(start, end)

cryptoStreaming = replayer.replay(fakeCryptoData, "Timestamp")

replayer.start()

It’s nice to watch new data hit the screen. Let's reverse the table so the newest trades appear at the top.

cryptoStreaming2 = cryptoStreaming.reverse()

img

tip

Many table operations can also be done from the UI, for example right-click on a column header in the UI and choose Reverse Table.

Now that you have a few tables, the next section will introduce adding new columns to them and merging.

Create columns and merge tables

Let's examine the data a bit programmatically. Use countBy to see the row-count of the tables, respectively.

Table operations, methods, and other capabilities of the Deephaven table API are used identically for updating (streaming) tables and static ones!

This simple example illustrates this superpower:

rowCountFromCsv = cryptoFromCsv.countBy("RowCount").updateView("Source = `CSV`")
rowCountStreaming = cryptoStreaming.countBy("RowCount").updateView("Source = `Streaming`")

img

You can eyeball the respective row counts easily by merging the tables. In the future, if you need to merge then sort a table, we recommend using mergeSorted, as it is more efficient.

rowCountCompare = merge(rowCountFromCsv, rowCountStreaming)

img

Explore the schema and other metadata using meta.

metaFromCsv = cryptoFromCsv.meta().updateView("Source = `CSV`").moveColumnsUp("Source")
metaStreaming = cryptoStreaming.meta().updateView("Source = `Streaming`").moveColumnsUp("Source", "Name")

Let's create one table of crypto data that has both updating and static data. The last line removes the legacy static table.

cryptoMain = merge(cryptoFromCsv, cryptoStreaming2)\
.sortDescending("Timestamp")
cryptoFromCsv.close()

img

In the next section, you’ll learn about adding new columns to support calculations and logic, and doing aggregations.

Manipulate and aggregate data

It's likely you've figured out a few of Deephaven’s fundamentals:

  • You name tables and operate on them. Everything in Deephaven is a table. Streams are updating tables. Batches are static ones. You don't have to track this.
  • You apply methods to these tables and can be blind about whether the data is updating or not.
  • You can refer to other named tables, and data simply flows from tables to its dependents. You may know this as an acyclic graph. See our concept guide on the table update model if you're interested in what's under-the-hood.
  • There is no optimizer to wrestle with. You’ll appreciate this once you tackle complex use cases or need to bring your Python, Java, or wrapped C++ code to the data.

Aggregations are an important use case for streaming data. (And static, too.) Doing a single, dedicated aggregation, like the sumBy below, follows a pattern similar to the countBy you did earlier.

cryptoSumBy = cryptoMain.view("Instrument", "Exchange", "Trade_Count = 1", "Total_Base_Value = Price * Size")\
.sumBy("Instrument", "Exchange")\
.sortDescending("Trade_Count")

img

tip

You can also add columns with Custom Columns in the Table Options menu in the web UI.

If your use case is well served by adding columns in a formulaic, on-demand fashion (instead of writing results to memory), use updateView.

Binning data is fundamental and is intended to be easy via upperBin and lowerBin. This is heavily used in profiling and sampling data.

The query below reuses the same table name (cryptoMain). That’s just fine. Then, it does an aggregation by 5-second bins.

cryptoMain = cryptoMain.updateView("ValueBaseCcy = (Price * Size)",\
"TimeBin = upperBin(Timestamp, 2 * SECOND)")

crypto5secAgg = cryptoMain.view("TimeBin", "Instrument", "Size")\
.sumBy("TimeBin", "Instrument")\
.sortDescending("TimeBin", "Instrument")

View distinct values using selectDistinct.

// 1 column
distinctInstruments = cryptoMain.selectDistinct("Instrument").sort("Instrument")
// 2 columns
instrumentExchange = cryptoMain.selectDistinct("Exchange", "Instrument").sort("Exchange", "Instrument")
// countBy looks similar
countBy = cryptoMain.countBy("Trade_Count", "Exchange", "Instrument").sort("Exchange", "Instrument")

img

tip

You can also accomplish this with Select Distinct Values in the Table Options menu in the web UI.

Performing multiple aggregations simultaneously may prove logical and helpful to performance.

Let's define an aggregation function to be used later. The function will return an aggregation result based on the table and aggregation-keys you pass in.

import io.deephaven.api.agg.Aggregation

def aggregateCrypto(io.deephaven.engine.table.Table table, String... aggKeys) {
def aggList = [
AggFirst("LastTimestamp = Timestamp"),
AggSum("TotalValueTraded = ValueBaseCcy", "TotalSize = Size"),
AggWAvg("Size", "WtdAvgPrice = Price"),
AggCount("TradeCount"),
AggMin("LowPrice = Price"),
AggMax("HiPrice = Price")
]
return table.aggBy(aggList, aggKeys)
}

Below, you equip aggregateCrypto with different numbers and versions of keys. The last table has some extra polish to make the resulting table more valuable to the eye.

def aggregateCrypto(io.deephaven.engine.table.Table table, String... aggKeys) {
def aggList = [
AggFirst("LastTimestamp = Timestamp"),
AggSum("TotalValueTraded = ValueBaseCcy", "TotalSize = Size"),
AggWAvg("Size", "WtdAvgPrice = Price"),
AggCount("TradeCount"),
AggMin("LowPrice = Price"),
AggMax("HiPrice = Price")
]
return table.aggBy(aggList, aggKeys)
}

// 1 key
agg1Key = aggregateCrypto(cryptoMain, "TimeBin")

// 2 keys
agg2Keys = aggregateCrypto(cryptoMain, "Exchange", "Instrument")\
.sort("Exchange", "Instrument")

// keys
agg3Keys = aggregateCrypto(cryptoMain, "Exchange", "Instrument", "TimeBin")\
.sortDescending("LastTimestamp")\
.updateView("TotalValueTraded = (int)TotalValueTraded", "TotalSize = (long)TotalSize")\
.moveColumnsUp("Instrument", "TradeCount", "TotalValueTraded")

img

Filter, join, and as-of-join

Deephaven filtering is accomplished by applying where operations. The engine supports a large array of match, conditional, and combination filters.

These four scripts are simple examples.

filterBtc = cryptoMain.where("Instrument = `BTC/USD`")
filterEth = cryptoMain.where("Instrument = `ETH/USD`")
filterEthAndPrice = cryptoMain.where("Instrument = `ETH/USD`", "Size > 1")
filterEthAndExchange = cryptoMain.where("Instrument = `ETH/USD`", "Exchange.startsWith(`bi`) = true")

img

Use whereIn to filter one table based on the contents of another "filter table". If the filter table updates, the filter applied to the other changes automatically.

In the third line below, you’ll filter the table cryptoMain based on the Instrument values of the table row1.

import io.deephaven.api.agg.Aggregation

def aggregateCrypto(io.deephaven.engine.table.Table table, String... aggKeys) {
def aggList = [
AggFirst("LastTimestamp = Timestamp"),
AggSum("TotalValueTraded = ValueBaseCcy", "TotalSize = Size"),
AggWAvg("Size", "WtdAvgPrice = Price"),
AggCount("TradeCount"),
AggMin("LowPrice = Price"),
AggMax("HiPrice = Price")
]
return table.aggBy(aggList, aggKeys)
}

aggByInstrument = aggregateCrypto(cryptoMain, "Instrument")
row1 = aggByInstrument.head(1)
filterWhereIn = cryptoMain.whereIn(row1, "Instrument")

aggByInstrument = aggregateCrypto(cryptoMain, "Instrument")
row1 = aggByInstrument.head(1)
filterWhereIn = cryptoMain.whereIn(row1, "Instrument")

Deephaven joins are first class, supporting joining real-time, updating tables with each other (and with static tables) without any need for windowing.

Our guide, Choose a join method, offers guidance on how to choose the best method for your use case.

Generally, joins fall into one of two categories:

Use naturalJoin when you expect no more than one match in the right table per key, and are happy to receive null records as part of the join process.

// filter the aggregation tables to create a table for BTC and ETH
tradeCountBtc = agg2Keys.where("Instrument = `BTC/USD`").view("Exchange", "TradeCount")
tradeCountEth = agg2Keys.where("Instrument = `ETH/USD`").view("Exchange", "TradeCount")

// naturalJoin() using "Exchange" as the join-key
// pull column "TradeCount" from the *_eth table, renaming it "TradeCountEth"
join2Tables = tradeCountBtc.renameColumns("TradeCountBtc = TradeCount")\
.naturalJoin(tradeCountEth, "Exchange", "TradeCountEth = TradeCount")

img

Though Deephaven excels with relational joins, its ordering capabilities make it an excellent time series database.

Time series joins, or “as-of joins”, take a timestamp key from the left table and do a binary search in the right table (respecting other join keys) seeking an exact timestamp-nanosecond match. If no match exists, the timestamp just prior to the join-timestamp establishes the match target.

It is important to note:

  • The right table needs to be sorted.
  • Numerical fields other than date-times can also be used for the final key in as-of joins.
  • Reverse-as-of join is similar, but uses the record just after the target timestamp if no exact match is found.
  • One can syntactically use < or > (instead of =) in the query to eliminate the exact match as the best candidate.
// filter the original cryptoMain table to get raw BTC and ETH trade records
cryptoBtc = cryptoMain.where("Instrument = `BTC/USD`")
cryptoEth = cryptoMain.where("Instrument = `ETH/USD`")

// for each record of the right table
timeSeriesJoin = cryptoBtc.view("Timestamp", "Price")\
.aj(cryptoEth, "Timestamp", "EthTime = Timestamp, PriceEth = Price")\
.renameColumns("TimeBtc = Timestamp", "PriceBtc = Price")

The introduction of Exchange as a join-key in front of Timestamp in the script below directs the engine to do the as-of-join after first doing an exact match on Exchange between the left and right tables.

timeSeriesJoin2Keys = cryptoBtc.view("Exchange", "TimeBtc = Timestamp", "PriceBtc = Price")\
.aj(cryptoEth, "Exchange, TimeBtc >= Timestamp", "EthTime = Timestamp, EthPrice = Price")

img

People often use aj to join records that are shifted by a time phase.

// Create a column to represent time 1 minute before the "Timestamp".
cryptoBtcSimple = cryptoBtc.view("TimeTrade = Timestamp", "TimeLess1min = TimeTrade - MINUTE", "PriceNow = Price")

// As-of join the table on itself.
timeSeriesJoinSelf = cryptoBtcSimple.aj(cryptoBtcSimple, "TimeTrade >= TimeLess1min", "Price1MinPrev = PriceNow")\
.updateView("PriceDiff = PriceNow - Price1MinPrev")

img

Plot data via the UI

It's easy to export your data out of Deephaven to popular open formats.

To export our final, joined table to a CSV file, simply use the writeCsv method with table name and the location to which you want to save the file. See managing Docker volumes for more information on how to save files to your local machine.

CsvTools.writeCsv(timeSeriesJoinSelf, "/data/timeSeriesJoinSelf.csv")

If the table is dynamically updating, Deephaven will automatically snapshot the data before writing it to the file.

Similarly, for Parquet:

import io.deephaven.parquet.table.ParquetTools

ParquetTools.writeTable(timeSeriesJoinSelf, new File("/data/timeSeriesJoinSelf.parquet"), ParquetTools.GZIP)

What to do next

Now that you've imported data, created tables, and manipulated static and real-time data, take a look at our full set of how-to guides. They cover a wide range of topics, from working with the UI to writing Python functions.