Skip to main content

Why partitioned tables are powerful

· 9 min read
Thousands of puzzle pieces forming a data stream
JJ Brosnan
The benefits of partitioning data

“You don’t have to be an engineer to be a racing driver, but you do have to have Mechanical Sympathy.”
– Jackie Stewart, racing driver

This simple quote has deep meaning when it comes to many facets of life. Its value cannot be understated when it comes to software. In this article, we'll touch upon an important concept in Deephaven - partitioned tables. Like with auto racing, having a grasp of the mechanics behind a system will enable you to maximize its potential. So, let's take a closer look at partitioned tables and how they can help you get the most out of Deephaven.

Partitioned tables can help parallelize queries across multiple threads and generally improve performance when used in the right context. They can also make it easier to work with big data in the user interface.

What is a partitioned table?

A partitioned table is divided into smaller parts called constituent tables (or subtables). These parts are grouped based on the values of certain columns, called key columns. The values in these key columns decide which constituent table each row belongs to.

A partitioned table can be thought of in two ways:

  1. A vertically stacked list of tables. This list can be combined into a single table with merge.
  2. A map of tables. A constituent table can be retrieved by its key values with get_constituent.

A partitioned table can be created in two ways:

  1. By partitioning a source table.
  2. Directly from a Kafka stream.

This article is focused on why partitioned tables are a powerful tool, so it will only discuss partitioning source tables. For more information on consuming Kafka streams directly into a partitioned table, see consume_to_partitioned_table.

Partitioned tables in action

Let's look at some basic examples of creating and using partitioned tables.

Partition a table into constituent tables

The following code reads a CSV file of insurance data into Deephaven. It then partitions the table by the region column and gets a table containing each partitioned table keys.

from deephaven import read_csv

insurance = read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/Insurance/csv/insurance.csv")

insurance_partitioned = insurance.partition_by("region")

partition_keys = insurance_partitioned.keys()

Tables can be partitioned by an arbitrary number of key columns. Consider the following code, which instead partitions the insurance table by 3 key columns: region, smoker, and sex.

insurance_partitioned = insurance.partition_by(["region", "smoker", "sex"])

partition_keys = insurance_partitioned.keys()

Get constituent tables from a partitioned table

To get individual constituent tables from a partitioned table, you need to specify which constituent you want by using the unique key values. The following code gets the constituent for non-smoking women in the Northwest.

insurance_nonsmoking_women_northwest = insurance_partitioned.get_constituent(["northwest", "no", "female"])

To learn more about partitioned tables, head over to our user guide.

Why use partitioned tables?

Why? is the most important question when considering whether partitioned tables are right for your queries. Let's examine the convenience and performance benefits they provide.

Convenience

A partitioned table can be used like a map to retrieve constituent tables via their keys. Deephaven's user interface has a built-in partitioned table viewer that allows you to:

  • View the keys of a partitioned table.
  • View constituent tables.
  • Merge the partitioned table.

img

Speaking of user interface, deephaven.ui is a powerful tool for creating custom user interfaces. Combining it with partitioned tables enables you to do things like switch between constituents with a single click. Consider the following code, which uses a dropdown menu to choose which partition to plot.

import deephaven.plot.express as dx
from deephaven import time_table
import deephaven.ui as ui

tt = time_table("PT0.25s").update([
"Type = random() < 0.5 ? `Sine` : `Cosine`",
"X = 0.05 * i",
"Y = (Type == `Sine`) ? sin(X) : cos(X)"
])

# Partition the table and get the keys
pt = tt.partition_by("Type")
keys = pt.keys()

@ui.component
def plot_pt(partitioned_table, initial_val):
text, set_text = ui.use_state(initial_val)
ct = ui.use_memo(
lambda: partitioned_table.get_constituent(text.capitalize()) if text != "" else None,
[partitioned_table, text]
)
picker = ui.picker(keys, selected_key=text, on_change=set_text)

return [
picker,
dx.line(
ct, x="X", y="Y", title=f"Partition Key: {text}"
)
if ct != None else ui.text("Please enter a valid partition.")
]

p = plot_pt(pt, "Sine")

img

Performance

info

Partitioned tables are not a one-size-fits-all solution. They will not always make queries faster. It's important to understand when and how to use them to get the most out of Deephaven.

Partitioned tables can make queries faster in a couple of different ways.

Parallelization

Partitioned tables can improve query performance by parallelizing operations that standard tables cannot. Take, for example, an as-of join between two tables. If the tables are partitioned by the exact match columns of the join, the join operation is done in parallel.

Partitioned tables are powerful but aren't a magic wand that improves performance in all cases. Parallelization is best employed when:

  • Shared resources between concurrent tasks are minimal.
  • Partitioned data is dense.
  • Data is sufficiently large.

It's also worth noting that Python's Global Interpreter Lock (GIL) prevents threads from running concurrently. Maximizing partitioned table query parallelization requires minimizing the amount of unnecessary Python code invoked.

Let's look at this parallelization in action. I just made the claim that the join operation in an as-of join is done in parallel if two tables are partitioned by the exact match columns. The following code creates two fake tables of 5 million rows each that contain quote and trade data for seven different "symbols" on four different "exchanges". It then partitions them on the exact match columns and compares the performance between the standard tables and the partitioned tables:

from deephaven.execution_context import get_exec_ctx
from deephaven import empty_table
from string import ascii_uppercase
from random import choice
from time import time

n_rows = 5_000_000

ctx = get_exec_ctx()

def rand_key(keytype, minval, maxval) -> str:
return keytype + "_" + choice(ascii_uppercase[minval:maxval])

quotes = empty_table(n_rows).update([
"Timestamp = '2024-09-20T00:00:00 ET' + ii * SECOND",
"Exchange = rand_key(`Exchange`, 20, 24)",
"Symbol = rand_key(`Sym`, 0, 7)",
"QuoteSize = randomInt(1, 10)",
"QuotePrice = randomDouble(0, 100)",
])

trades = empty_table(n_rows).update([
"Timestamp = '2024-09-20T00:00:00.1 ET' + ii * SECOND",
"Exchange = rand_key(`Exchange`, 20, 24)",
"Symbol = rand_key(`Sym`, 0, 7)",
"TradeSize = randomInt(1, 10)",
"TradePrice = randomDouble(0, 100)",
])

pt_quotes = quotes.partition_by(["Exchange", "Symbol"])
pt_trades = trades.partition_by(["Exchange", "Symbol"])

def partitioned_aj(t1, t2):
with ctx:
return t1.aj(t2, ["Exchange", "Symbol", "Timestamp"])

start = time()
result = quotes.aj(trades, ["Exchange", "Symbol", "Timestamp"])
end = time()

print(f"Standard table aj: {(end - start):.4f} seconds.")

start = time()
pt_result = pt_quotes.partitioned_transform(pt_trades, partitioned_aj)
end = time()

print(f"Partitioned table aj: {(end - start):.4f} seconds.")

Performance improvements will generally scale with:

  • Size and density of data
  • Hardware resources (number of CPU cores especially)
  • Number of partitions

Tick amplification

Tick amplification happens when cell values are grouped and ungrouped. If any cell contributing to a grouped array changes, the entire grouped array is marked as changed. As a result, a single small change in an input table can result in large sections of an ungrouped output table changing. This can spiral out of control in real-time queries when data is sufficiently large.

The following query demonstrates tick amplification using a table listener. It:

  • Creates a time table that ticks once every 5 seconds (t1).
  • Groups, updates, then ungroups the table (t2).
  • Partitions, updates, then merges the table (t3).

The number of added and modified rows is recorded on each tick. This value is printed for each of the above three operations to show how many rows are modified or added each time t1 ticks.

from deephaven import time_table
from deephaven.table_listener import listen


def print_changes(label, update, is_replay):
added = update.added()
modified = update.modified()
n_added = len(added["X"]) if "X" in added else 0
n_modified = len(modified["X"]) if "X" in modified else 0
changes = n_added + n_modified
print(f"TICK PROPAGATION: {label} {changes} changes")


t1 = time_table("PT5s").update(["A=ii%2", "X=ii"])

# Group/ungroup
t2 = t1.group_by("A").update("Y=X+1").ungroup()

# Partition/merge
t3 = t1.partition_by("A").proxy().update("Y=X+1").target.merge()

# Listen to the table changes and print the number of changes each time the table ticks
h1 = listen(
t1, lambda update, is_replay: print_changes("RAW ", update, is_replay)
)
h2 = listen(
t2, lambda update, is_replay: print_changes("GROUP/UNGROUP ", update, is_replay)
)
h3 = listen(
t3, lambda update, is_replay: print_changes("PARTITION/MERGE", update, is_replay)
)

The first time t1 ticks, this gets printed:

TICK PROPAGATION: RAW             1 changes
TICK PROPAGATION: GROUP/UNGROUP 1 changes
TICK PROPAGATION: PARTITION/MERGE 1 changes

So far, no issues. But, that's because t1 has only ticked once. After a little while, here's the output:

TICK PROPAGATION: RAW             1 changes
TICK PROPAGATION: GROUP/UNGROUP 10 changes
TICK PROPAGATION: PARTITION/MERGE 1 changes

As t1 grows, the grouping and ungrouping operation continues to make more and more changes, whereas the partition/merge only makes one.

When the data is grouped, the engine cannot know that only one cell in a grouped array has changed, and it must recalculate the entire group. However, when the data is partitioned, only a single row in a single partition changes.

In summary

Partitioned tables are a powerful tool to help you get the most out of Deephaven. If used correctly, they can boost performance and convenience in queries, particularly ones on big data. Consider partitioned tables for your applications if:

  • Your data is dense.
  • You need to parallelize queries across multiple threads.
  • You have user interfaces that frequently request subtables by one or more key columns.

Reach out

Our Slack community continues to grow! It's a great place to learn if partitioned tables are right for your Deephaven applications. Reach out to us with any questions, comments, or feedback. We'd love to hear from you!