Create and use partitioned tables
This guide will show you how to create and use partitioned tables. A partitioned table is a special type of Deephaven table with a column containing other tables (known as constituent tables or subtables), plus additional key column(s) that are used to index and access particular constituent tables. Essentially, a partitioned table can be visualized as a vertical stack of tables with the same schema, all housed within a single object.
Like a list of tables, partitioned tables can be merged together to form a new table. Unlike a list of tables, partitioned tables take advantage of parallelization, which can improve query performance when leveraged properly.
Subtable partitioning via partitionBy
should not be confused with grouping and aggregation. The partitionBy
table operation partitions tables into subtables by key columns. Aggregation operators such as aggBy
compute summary information over groups of data within a table.
What is a partitioned table?
A partitioned table is a special Deephaven table that holds one or more subtables, called constituent tables. Think of a partitioned table as a table with a column containing other Deephaven tables plus additional key columns that are used to index and access particular constituent tables. All constituent tables in a partitioned table must have the same schema.
A partitioned table can be thought of in two ways:
- A list of tables stacked vertically. This list can be combined into a single table using
merge
. - A map of tables. A constituent table can be retrieved by key using
constituentFor
.
Partitioned tables are typically used to:
- Parallelize queries across multiple threads.
- Quickly retrieve subtables in a user interface.
- Improve the performance of filters iteratively called within loops.
Create a partitioned table with partitionBy
There are two ways to create a partitioned table:
From a table
The simplest way to create a partitioned table is from another table. To show this, let's first create a new table.
source = newTable(
stringCol("Letter", "A", "B", "C", "B", "C", "C", "A", "B", "A", "A"),
intCol("Value", 5, 9, -4, -11, 3, 0, 18, -1, 1, 6),
)
- source
Creating a partitioned table from the source
table requires only one table operation: partitionBy
. In this simple case, source
is broken into subtables based upon the single key column Letter
. The resulting partitioned table will have three constituent tables, one for each unique value in the Letter
column.
result = source.partitionBy("Letter")
keys = result.table.selectDistinct("Letter")
- keys
Partitioned tables can be constructed from more than one key column. To show this, we'll create a source
table with more than two columns.
source = newTable(
stringCol("Exchange", "Kraken", "Coinbase", "Coinbase", "Kraken", "Kraken", "Kraken", "Coinbase"),
stringCol("Coin", "BTC", "ETH", "DOGE", "ETH", "DOGE", "BTC", "BTC"),
doubleCol("Price", 30100.5, 1741.91, 0.068, 1739.82, 0.065, 30097.96, 30064.25),
)
- source
Partitioning a table by multiple keys is done in the same manner as by one key.
result = source.partitionBy("Exchange", "Coin")
keys = result.table.selectDistinct("Exchange", "Coin")
- keys
From a Kafka stream
Deephaven can consume data from Kafka streams. Streaming data can be ingested into a table via consumeToTable
, or directly into a partitioned table via consumeToPartitionedTable
.
When ingesting streaming Kafka data directly into a partitioned table, the data is partitioned by the Kafka partition number of the topic. The constituent tables are the tables per topic partition.
The following example ingests data from the Kafka topic orders
directly into a partitioned table.
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
symbolDef = ColumnDefinition.ofString('Symbol')
sideDef = ColumnDefinition.ofString('Side')
priceDef = ColumnDefinition.ofDouble('Price')
qtyDef = ColumnDefinition.ofInt('Qty')
ColumnDefinition[] colDefs = [symbolDef, sideDef, priceDef, qtyDef]
mapping = ['jsymbol': 'Symbol', 'jside': 'Side', 'jprice': 'Price', 'jqty': 'Qty']
spec = KafkaTools.Consume.jsonSpec(colDefs, mapping, null)
pt = KafkaTools.consumeToPartitionedTable(
kafkaProps,
'orders',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.IGNORE,
spec,
KafkaTools.TableType.append()
)
keys = pt.table.selectDistinct("Partition")
For more information and examples on ingesting Kafka streams directly into partitioned tables, see consumeToPartitionedTable
.
Partitioned table methods
Partitioned tables have a variety of methods that differ from that of standard tables. For more information on each, without the focus on conceptual reasons for using them, see the reference docs:
constituentChangesPermitted
constituentColumnName
constituentDefinition
constituentFor
constituents
filter
keyColumnNames
merge
partitionedTransform
proxy
sort
table
transform
uniqueKeys
Examples
The following code creates two partitioned tables from other tables, which will be used in the following examples. The quotes
and trades
tables are constructed with newTable
. They contain hypothetical stock quote and trade data. The Ticker
column holds the key values used to partition each table into subtables.
trades = newTable(
stringCol("Ticker", "AAPL", "AAPL", "AAPL", "IBM", "IBM"),
instantCol(
"Timestamp",
parseInstant("2021-04-05T09:10:00 America/New_York"),
parseInstant("2021-04-05T09:31:00 America/New_York"),
parseInstant("2021-04-05T16:00:00 America/New_York"),
parseInstant("2021-04-05T16:00:00 America/New_York"),
parseInstant("2021-04-05T16:30:00 America/New_York"),
),
doubleCol("Price", 2.5, 3.7, 3.0, 100.50, 110),
intCol("Size", 52, 14, 73, 11, 6),
)
quotes = newTable(
stringCol("Ticker", "AAPL", "AAPL", "IBM", "IBM", "IBM"),
instantCol(
"Timestamp",
parseInstant("2021-04-05T09:11:00 America/New_York"),
parseInstant("2021-04-05T09:30:00 America/New_York"),
parseInstant("2021-04-05T16:00:00 America/New_York"),
parseInstant("2021-04-05T16:30:00 America/New_York"),
parseInstant("2021-04-05T17:00:00 America/New_York"),
),
doubleCol("Bid", 2.5, 3.4, 97, 102, 108),
intCol("BidSize", 10, 20, 5, 13, 23),
doubleCol("Ask", 2.5, 3.4, 105, 110, 111),
intCol("AskSize", 83, 33, 47, 15, 5),
)
ptQuotes = quotes.partitionBy("Ticker")
ptTrades = trades.partitionBy("Ticker")
- quotes
- trades
Partitioned tables are primarily used to improve query performance and quickly retrieve subtables in the user interface. These reasons are conceptual in nature but are discussed in the context of tangible examples in the subsections below.
Grab a constituent with constituentFor
A partitioned table contains one or more constituent tables, or subtables. When the partitioned table is created with key columns, these constituent tables each have a unique key, from which they can be obtained. The code block below grabs a constituent from each of the ptQuotes
and ptTrades
tables based on a key value.
quotesIbm = ptQuotes.constituentFor("IBM")
tradesAapl = ptTrades.constituentFor("AAPL")
- quotesIbm
- tradesAapl
Merge
A partitioned table is similar to a vertically stacked list of tables with the same schema. Just as a list of tables with identical schemas can be merged into a single table, so can a partitioned table. This is often the best and easiest way to get a standard table from a partitioned table.
The following example merges the ptQuotes
and ptTrades
tables to return tables similar to the original quotes
and trades
tables they were created from.
quotesNew = ptQuotes.merge()
tradesNew = ptTrades.merge()
- quotesNew
- tradesNew
Modify a partitioned table
There are two ways to modify a partitioned table: transform
and proxy
. Both achieve the same results, so the choice of which to use comes down to personal preference.
Transform
Standard table operations can be applied to a partitioned table through a transform
, which applies a user-defined transformation function to all constituents of a partitioned table. When using transform
, any and all table operations applied in the transformation function must be done from within an execution context.
The following code block uses transform
to apply an update
to every constituent of the ptTrades
table.
import io.deephaven.engine.context.ExecutionContext
import io.deephaven.util.SafeCloseable
defaultCtx = ExecutionContext.getContext()
transformFunc = { t ->
try (SafeCloseable ignored = defaultCtx.open()) {
return t.update("TotalValue = Price * Size")
}
}
ptTradesUpdated = ptTrades.transform(transformFunc)
tradesUpdated = ptTradesUpdated.merge()
- tradesUpdated
Proxy
The same result can be obtained via a PartitionedTable.Proxy
object. A partitioned table proxy is a proxy for a partitioned table that allows users to call standard table operations on it.
The following code block applies an update
to every constituent of the ptQuotes
table by creating a proxy rather than applying a transform
.
ptTradesProxy = ptTrades.proxy()
ptTradesProxyUpdated = ptTradesProxy.update("TotalValue = Price * Size")
ptTradesUpdated = ptTradesProxyUpdated.target()
tradesUpdated = ptTradesUpdated.merge()
- tradesUpdated
When using a Partitioned Table proxy, you must call .target()
to obtain the underlying partitioned table.
Should I use transform or proxy?
A transform
and a proxy
can be used to achieve the same results. So, which should you use?
- A
transform
gives greater control. Also, it gives better performance in certain cases. - A
proxy
provides more convenience and familiarity by allowing the use of normal table operations on a partitioned table.
If you want ease and convenience, use proxy
. If you want greater control, or if performance is a high priority in your query, use transform
.
Combine two partitioned tables with partitionedTransform
Standard tables can be combined through any of the available joins. The same applies to partitioned tables. Where standard tables use one of the available join operations on their own, partitioned tables can be joined through a partitioned transform.
A partitioned transform is similar to a transform
, except the transformation function takes two tables as input and returns a single table. The transformation function can apply any table operations to either input table, so long as it returns a single table.
The following example shows the most basic usage of partitionedTransform
: it takes two tables as input and returns only the first table that's been passed in.
import io.deephaven.engine.context.ExecutionContext
import io.deephaven.util.SafeCloseable
defaultCtx = ExecutionContext.getContext()
partitionedTransformFunc = { t1, t2 ->
try (SafeCloseable ignored = defaultCtx.open()) {
return t1
}
}
ptQuotesNew = ptQuotes.partitionedTransform(ptTrades, partitionedTransformFunc)
quotesNew = ptQuotesNew.merge()
- quotesNew
The above example is not practical. It does, however, show the basic concept that a partitionedTransform
takes two partitioned tables as input and returns a single partitioned table. Most usages of partitionedTransform
will join the two partitioned tables together, like in the example below.
import io.deephaven.engine.context.ExecutionContext
import io.deephaven.util.SafeCloseable
defaultCtx = ExecutionContext.getContext()
partitionedTransformFunc = { t1, t2 ->
try (SafeCloseable ignored = defaultCtx.open()) {
return t1.join(t2, "Ticker, Timestamp")
}
}
ptJoined = ptTrades.partitionedTransform(ptQuotes, partitionedTransformFunc)
result = ptJoined.merge()
- result
The same result can be achieved through a partitioned table proxy.
ptQuotesProxy = ptQuotes.proxy()
ptTradesProxy = ptTrades.proxy()
ptProxyJoined = ptTradesProxy.join(ptQuotesProxy, "Ticker, Timestamp")
ptJoined = ptProxyJoined.target()
resultViaProxy = ptJoined.merge()
- resultViaProxy
Why use partitioned tables?
So far this guide has shown how you can use partitioned tables in your queries. But it doesn't cover why you may want to use them. Initially, we discussed that partitioned tables are useful for:
- Quickly retrieving subtables in the user interface.
- Improving performance.
These will be discussed in more detail in the subsections below.
Quickly retrieve subtables
When a partitioned table is created with key columns, it can be used like a map to retrieve constituent tables via constituentFor
. This can be useful in user interface components that need to display a subset of a table quickly.
For instance, the examples above use the ptTrades
and ptQuotes
tables. In the UI, the ptTrades
subtable with the key value IBM
is displayed. Menu options atop the table let you view the key values, merge the table, or switch between key values to see data in each constituent.
Improve performance
Partitioned tables can improve performance in a couple of different ways.
Parallelization
Partitioned tables can also improve query performance by parallelizing things that standard tables cannot. Take, for example, an as-of join between two tables. If the tables are partitioned by the exact match columns, then the join operation is done in parallel.
Partitioned tables are powerful, but aren't a magic wand that improves performance in all cases. Parallelization is best employed when:
- Shared resources between concurrent tasks are minimal.
- Partitioned data is dense.
- Data is sufficiently large.
If you are unsure if parallelization through partitioned tables could improve your query performance, reach out to us on Slack for more specific guidance.
Tick amplification
In grouping and ungrouping operations, the Deephaven query engine does not know which cells change. Even if only a single cell changes, an entire array is marked as modified, and large sections of the output table change. In a real-time query, this can potentially cause many unnecessary calculations to be performed. Take, for instance, the following query.
import io.deephaven.engine.table.TableUpdate
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter
t1 = timeTable("PT5s").update("A=ii%2", "X=ii")
// Group/ungroup
t2 = t1.groupBy("A").update("Y=X+1").ungroup()
// Partition/merge
t3 = t1.partitionBy("A").proxy().update("Y=X+1").target.merge()
h1 = new InstrumentedTableUpdateListenerAdapter(t1, false) {
@Override
public void onUpdate(TableUpdate upstream) {
numChanges1 = len(upstream.added()) + len(upstream.modified())
println "TICK PROPAGATION: RAW " + numChanges1 + " changes"
}
}
h2 = new InstrumentedTableUpdateListenerAdapter(t2, false) {
@Override
public void onUpdate(TableUpdate upstream) {
numChanges2 = len(upstream.added()) + len(upstream.modified())
println "TICK PROPAGATION: GROUP/UNGROUP " + numChanges2 + " changes"
}
}
h3 = new InstrumentedTableUpdateListenerAdapter(t3, false) {
@Override
public void onUpdate(TableUpdate upstream) {
numChanges3 = len(upstream.added()) + len(upstream.modified())
println "TICK PROPAGATION: PARTITION/MERGE " + numChanges3 + " changes"
}
}
t1.addUpdateListener(h1)
t2.addUpdateListener(h2)
t3.addUpdateListener(h3)
At first, this is the output of the query:
TICK PROPAGATION: RAW 1 changes
TICK PROPAGATION: GROUP/UNGROUP 1 changes
TICK PROPAGATION: PARTITION/MERGE 1 changes
After letting the code run for a little while, this is the output:
TICK PROPAGATION: RAW 1 changes
TICK PROPAGATION: GROUP/UNGROUP 10 changes
TICK PROPAGATION: PARTITION/MERGE 1 changes
As the code runs longer, the grouping/ungrouping operation on its own continues to make more and more changes, whereas the partition/merge stays at one. Every change reported in the group/ungroup is an unnecessary calculation that performs extra work for no benefit. This is tick amplification in action. Where a group and ungroup suffers from this problem, a partition and merge does not.