Skip to main content

Ultra fast joins with multi-join

· 6 min read
AI Prompt: small colorful cubes surrounding one small multi-colored cube in the middle, connected with colored wires
JJ Brosnan
Joining multitudes of tables seamlessly and quickly

Deephaven's version 0.28 release brought some cool new features and improvements to client libraries. Perhaps the most exciting is the addition of multi_join to the Deephaven table API. With multi_join, you can accomplish what previously took several lines of code in a single line. Not only that, but it's way faster than the old way of chaining joins together.

Multi-join is a new feature that joins three or more tables together. It's faster, much more memory efficient than the only alternative, and works all the same on static and dynamic data.

In this blog, we'll compare performance of using multi_join and its multi-operation analog. The results are impressive.

What is multi_join?

Put simply: multi_join joins tables together. It's the only way to join three or more tables with a single method call. It's also lightning fast regardless of how many tables are being joined. It uses only a single hash table to perform the join, so it's memory efficient as well.

The syntax of multi_join is simple. Consider which tables to join together, as well as the key column(s) to join on. Set both as lists to use in multi_join, and use .table() to get the resultant table.

from deephaven.table import multi_join

tables = [table1, table2, table3, table4]
key_cols = ["Key1", "Key2"]

result = multi_join(input=tables, on=key_cols).table()

The result of a multi_join can be duplicated without using the method itself, but it's not as nice. Here, we perform the same operation as the code block above without multi_join:

from deephaven import merge

tables = [table1, table2, table3, table4]
key_cols = ["Key1", "Key2"]

result = merge([t.view(key_cols) for t in tables]).select_distinct(key_cols)
for idx in range(len(tables)):
result = result.natural_join(table=tables[idx], on=key_cols)

The code block above replaces multi_join with list iteration, view, select_distinct, and then a series of natural_join operations performed in a loop. Syntactically, it's complicated. But syntax isn't the main driver for the new feature -- performance is. So, let's back up the claims of speed and memory efficiency with some testing.

Static example

Why use multi_join? Because it's fast and memory efficient.

This first example joins together six tables, each containing different statistics for NFL quarterbacks from the 1999 to 2022 NFL seasons (24 seasons). The data is sourced from CSV files created using nfl_data_py. The files contain per-season data for completions, pass attempts, yards, touchdowns, interceptions, and sacks for any player who attempted a pass at least once in a season.

tip

The Python nfl_data_py package is awesome for getting NFL data into Python. I highly recommend it if you're interested in working with NFL data.

The following code reads each file into a list of tables. One of the tables is separated from the rest to show what the data looks like.

from deephaven import read_csv

qb_stat_tables = []
stats = ["completions", "attempts", "passing_yards", "passing_tds", "interceptions", "sacks"]

for stat in stats:
qb_stat_tables.append(read_csv(path=f"/data/{stat}.csv"))

qb_completions = qb_stat_tables[0]

img

Each table contains the name and season column, but contains a different statistic as the filename suggests. As mentioned previously, the results of a multi_join can be duplicated without using it. Let's see what that looks like, how fast it is, and how much memory it uses.

from deephaven.perfmon import query_operation_performance_log
from deephaven import merge
from time import time

keys = ["name", "season"]

start = time()
qb_data_slowjoined = merge([t.view(keys) for t in qb_stat_tables]).select_distinct(keys)
for idx in range(len(stats)):
qb_data_slowjoined = qb_data_slowjoined.natural_join(table=qb_stat_tables[idx], on=keys)
end = time()

print(f"merge, select_distinct, and repeated natural_joins took {(end - start):.3f} seconds.")

qopl = query_operation_performance_log().view(["Description", "FreeMemoryChange", "TotalMemoryChange"])

img

img

It took 1.6 seconds to join together 6 tables, totaling 2439 rows and 8 columns. Looking at the query operations performance log, we see that the merge, selectDistinct, and six naturalJoin operations resulted in approximately 15MB of free memory being used.

Let's do the same thing, but this time, with multi_join. The code is not only simpler, but as you'll see, it's faster and uses less memory.

from deephaven.perfmon import query_operation_performance_log
from deephaven.table import multi_join
from time import time

keys = ["name", "season"]

start = time()
qb_stats = multi_join(input=qb_stat_tables, on=keys).table()
end = time()

print(f"multi_join took {(end - start):.3f} seconds.")

qopl = query_operation_performance_log().view(["Description", "FreeMemoryChange", "TotalMemoryChange"])

img

img

It took 1.46 seconds to join the same 6 tables, which is ~10% faster. Looking at the memory usage, the free memory change is only approximately 900KB. That's over 15x less memory intensive! The speedup isn't substantial in such a small example, but with larger datasets, the difference will be more significant. The memory usage, though, is drastically less.

Real-time example

note

The example below is only forward-compatible with Deephaven versions 0.30.1 and earlier.

multi_join works all the same on real-time data. The example below joins 10 tables together, each updating with between 10 and 500 new rows per second. That's an average of around 2500 new rows per second being aggregated and multi-joined.

from deephaven.table import multi_join
from deephaven import perfmon as pm
from deephaven import time_table
from deephaven import merge
from deephaven import agg

import random


table_list = []

for idx in range(10):
n_per_sec = random.randint(10, 500)
t = time_table(period=f"PT{(1/n_per_sec):.3f}S").\
update(["TimeBin = lowerBin(Timestamp, SECOND)", f"Data_{idx} = randomDouble(-1000.0, 1000.0)"]).\
agg_by(aggs=[agg.avg(cols=[f"Avg_{idx} = Data_{idx}"]), agg.count_(f"Data_{idx}")], by=["TimeBin"])
table_list.append(t)

table_slowjoined = merge(tables=[t.view(["TimeBin"]) for t in table_list]).select_distinct(["TimeBin"])
for idx in range(10):
table_slowjoined = table_slowjoined.natural_join(table=table_list[idx], on=["TimeBin"])

table_multijoined = multi_join(input=table_list, on=["TimeBin"]).table().reverse()

qopl = pm.query_operation_performance_log().view(["Description", "DurationNanos", "FreeMemoryChange", "TotalMemoryChange"])

img

img

The image above shows the description, CPU duration in nanoseconds, free memory change, and total memory change for multi_join and the ten natural_joins performed to achieve the same result. The multi_join takes only ~1 millisecond of CPU time, while each natural_join takes ~0.8 milliseconds. All in all, multi_join takes about 8x less CPU time. That doesn't even count the CPU time required for merge and select_distinct.

Don't wait around to join all of your data together. Use multi_join to do it fast, and with less memory than you thought you'd need.

Reach out

We hope Deephaven's Community documentation provides guidance and answers your Deephaven questions. We're also active on Slack, where we can answer anything our docs don't. Don't hesitate to reach out!