Data Indexes
This feature is currently experimental. The API and performance characteristics are subject to change.
This guide covers what data indexes are, how to create and use them, and how they can improve query performance. A data index maps key values in one or more key columns to the row sets containing the relevant data. Data indexes are used to improve the speed of data access operations in tables. Many table operations will use a data index if it is present. Every data index is backed by a table, with the index stored in a column called dh_row_set
.
Data indexes can improve the speed of filtering operations. Additionally, data indexes can be useful if multiple query operations need to compute the same data index on a table. This is common when a table is used in multiple joins or aggregations. If the table does not have a data index, each operation will internally create the same index. If the table does have a data index, the individual operations will not need to create their own indexes and can execute faster and use less RAM.
Create a data index
A data index can be created from a source table and one or more key columns using data_index
:
from deephaven.experimental.data_index import data_index
from deephaven import empty_table
source = empty_table(10).update("X = i")
source_index = data_index(source, "X")
- source
When a new data index is created, it is not immediately computed. The index is computed when a table operation first uses it or the table
attribute is called on the index. This is an important detail when trying to understand performance data.
Every data index is backed by a table. The backing table can be retrieved with the table
attribute:
result = source_index.table
- result
The source
table in the above example doesn't create a very interesting data index. Each key value in X
only appears once. The following example better illustrates a data index:
from deephaven.experimental.data_index import data_index
from deephaven import empty_table
source = empty_table(10).update("X = 100 + i % 5")
source_index = data_index(source, "X").table
- source_index
- source
When looking at source
, we can see that the value 0
in X
appears in rows 0 and 5. The value 1
appears in rows 1, 6, and so on. Each row in dh_row_set
contains the rows where each unique key value appears. Let's look at an example for more than one key column.
from deephaven.experimental.data_index import data_index
from deephaven import empty_table
source = empty_table(20).update(["X = i % 4", "Y = i % 2"])
source_index = data_index(source, ["X", "Y"]).table
- source_index
- source
There are only four unique keys in X
and Y
together. The resultant column dh_row_set
shows the row sets in which each unique key appears.
All of the previous examples create tables with flat data indexes. A flat data index is one in which the row sets are sequential. However, it is not safe to assume that all data indexes are flat, as this is uncommon in practice. Consider the following example where the data index is not flat:
from deephaven.experimental.data_index import data_index
from deephaven import empty_table
from deephaven import agg
source = empty_table(25).update(
["Key = 100 + randomInt(0, 4)", "Value = randomDouble(0, 100)"]
)
source_index = data_index(source, "Key").table
result = source_index.where("Key in 100, 102")
- result
- source_index
- source
Retrieve and verify a data index
You can verify whether a data index exists for a particular table using one of two methods:
has_data_index
: Returns a boolean indicating whether a data index exists for the table and the given key column(s).data_index
: Ifcreate_if_absent
isFalse
, it returns the existing data index if there is one orNone
if there is no index.
The following example creates data indexes for a table and checks if they exist based on different key columns. has_data_index
is used to check if the index exists.
from deephaven.experimental.data_index import data_index, has_data_index
from deephaven import empty_table
source = empty_table(100).update(
[
"Timestamp = '2024-04-25T12:00:00Z' + (i * 1_000_000)",
"Key1=ii%8",
"Key2=ii%11",
"Key3=ii%19",
"value=-ii",
]
)
index_1_3 = data_index(source, ["Key1", "Key3"])
has_index_1 = has_data_index(source, ["Key1"])
has_index_2_3 = has_data_index(source, ["Key2", "Key3"])
has_index_1_3 = has_data_index(source, ["Key1", "Key3"])
print(f"There is a data index for source on Key 1: {has_index_1}")
print(f"There is a data index for source on Keys 2 and 3: {has_index_2_3}")
print(f"There is a data index for source on Keys 1 and 3: {has_index_1_3}")
index_2 = data_index(source, ["Key2"], create_if_absent=False)
has_index_2 = has_data_index(source, ["Key2"])
print(f"There is a data index for source on Key 2: {has_index_2}")
- Log
- source
Usage in queries
Data indexes are weakly reachable. Assign the index to a variable to ensure it does not get garbage collected.
If a table has a data index and an engine operation can leverage it, it will be automatically used. The following table operations can use data indexes:
- Join:
- Group and aggregate:
- Filter:
- Select:
- Sort:
The following subsections explore different table operations. The code blocks below use the following tables:
from deephaven.experimental.data_index import data_index
from deephaven import empty_table
from deephaven import agg
source = empty_table(100_000).update(
[
"Timestamp = '2024-04-25T12:00:00Z' + (i * 1_000_000)",
"Key1 = ii%11",
"Key2 = ii%47",
"Key3 = ii%499",
"Value = ii",
]
)
source_right = empty_table(400).update(
[
"Timestamp = '2024-04-25T12:00:00Z' + (i * 1_000_000)",
"Key1 = ii%11",
"Key2 = ii%43",
"Value = ii/2",
]
)
source_right_distinct = source_right.select_distinct(["Key1", "Key2"])
- source
- source_right
- source_right_distinct
Filter
The engine will use single and multi-column indexes to accelerate exact match filtering. Range filtering does not benefit from an index.
The Deephaven engine only uses a DataIndex
when the keys exactly match what is needed for an operation. For example, if a data index is present for the columns X
and Y
, it will not be used if the engine only needs an index for column X
.
The following filters can use the index created atop the code block:
source_k1_index = data_index(source, ["Key1"])
result_k1 = source.where("Key1 = 7")
result_k1_in = source.where("Key1 in 2,4,6,8")
- result_k1
- result_k1_in
The following example can only use the index for the first filter:
source_k1_index = data_index(source, ["Key1"])
result_k1_k2 = source.where(["Key1=7", "Key2=11"])
- result_k1_k2
This is true for filtering one table based on another as well. The first filter in the following example can use source_k1_k2_index
, whereas the second example cannot use source_right_k1_index
because the index does not match the filter:
source_k1_k2_index = data_index(source, ["Key1", "Key2"])
source_right_k1_index = data_index(source, ["Key1"])
result_wherein = source.where_in(source_right_distinct, cols=["Key1", "Key2"])
result_wherenotin = source.where_not_in(source_right_distinct, cols=["Key1", "Key2"])
- result_wherein
- result_wherenotin
Join
Different joins will use indexes differently:
Method | Can use indexes? | Can use left table index? | Can use right table index? |
---|---|---|---|
exact_join | ✅ | ✅ | 🚫 |
natural_join | ✅ | ✅ | 🚫 |
aj | ✅ | ✅ | ✅ |
raj | ✅ | ✅ | ✅ |
join | 🚫 | 🚫 | 🚫 |
left_outer_join | 🚫 | 🚫 | 🚫 |
full_outer_join | 🚫 | 🚫 | 🚫 |
multi_join | 🚫 | 🚫 | 🚫 |
range_join | 🚫 | 🚫 | 🚫 |
Natural join
When performing a natural_join
without data indexes, the engine performs a linear scan of the source
(left) table's rows. Each key is hashed and a map is created to the specified joining (right) table. If a data index exists for the source
(left) table, the engine skips the scan and uses the index directly to create the mapping.
Consider a natural_join
on source
and source_right
. The following example can use source_k1_k2_index
to accelerate the join, but not source_right_k1_k2_index
because the operation cannot use the right table index:
source_k1_k2_index = data_index(source, ["Key1", "Key2"])
source_right_k1_k2_index = data_index(source_right, ["Key1", "Key2"])
result_naturaljoined = source.natural_join(
source_right, on=["Key1", "Key2"], joins=["RhsValue=Value"]
)
- result_naturaljoined
As-of join
The exact match in an aj
or an raj
can be accelerated using data indexes, eliminating the task of grouping the source table rows by key. The following example can use the indexes created because they match the exact match column used in the join:
source_k1_index = data_index(source, ["Key1"])
source_right_k1_index = data_index(source_right, ["Key1"])
result_asofjoined = source.aj(
source_right,
on=["Key1", "Timestamp >= Timestamp"],
joins=["RhsValue=Value", "RhsTimestamp=Timestamp"],
)
- result_asofjoined
Sort
Sorting operations can be accelerated using both full and partial indexes.
- A full index matches all sorting columns.
- A partial index matches a subset of sorting columns.
When a full index is present, the engine only needs to sort the index rows.
A sort operation can only use a partial index if the partial index matches the first sorting column. When a sort operation uses a partial index, the engine sorts the first column using the index and then sorts any remaining columns naturally.
The following sort operation uses the index because it matches the sort columns (ordering does not matter):
source_k1_k2_index = data_index(source, ["Key1", "Key2"])
result_sorted_k2_k1 = source.sort_descending(order_by=["Key2", "Key1"])
- result_sorted_k2_k1
The following sort operation uses the index when sorting the first key column only:
source_k1_index = data_index(source, ["Key1"])
result_sorted_k1_k3 = source.sort(order_by=["Key1", "Key3"])
- result_sorted_k1_k3
The following sort operation does not use any indexes, as the only partial index (on Key1
) does not match the first sort column:
source_k1_index = data_index(source, ["Key1"])
source_right_k1_index = data_index(source_right, ["Key1"])
result_sorted_k3_k1 = source.sort(order_by=["Key3", "Key1"])
- result_sorted_k3_k1
Aggregate
Aggregations without data indexes require the engine to group all similar keys and perform the calculation on subsets of rows. The index allows the engine to skip the grouping step and immediately begin calculating the aggregation. The following aggregation uses the index because it matches the aggregation key columns:
source_k1_k2_index = data_index(source, ["Key1", "Key2"])
result_agged = source.agg_by(
[
agg.sum_(cols=["Sum=Value"]),
agg.avg(cols=["Avg=Value"]),
agg.std(cols=["Std=Value"]),
],
by=["Key1", "Key2"],
)
- result_agged
Persist data indexes with Parquet
Deephaven can save data indexes along with your data when you write a table to Parquet files. By default, it saves all available indexes. You can also choose to save only some indexes. If you try to save an index that doesn't exist yet, Deephaven will create a temporary index just for the saving process. When you load data from a Parquet file, it also loads any saved indexes.
The following example writes a table plus indexes to Parquet and then reads a table plus indexes from the Parquet files.
from deephaven import empty_table
from deephaven import agg as agg
from deephaven.experimental.data_index import data_index, has_data_index
from deephaven.parquet import read, write
from deephaven.liveness_scope import liveness_scope
source = empty_table(100_000).update(
[
"Timestamp = '2024-04-25T12:00:00Z' + (i * 1_000_000)",
"Key1=ii%11",
"Key2=ii%47",
"Key3=ii%499",
"value=ii",
]
)
# Write to disk and specify two sets of index key columns
write(source, path="/data/indexed.parquet", index_columns=[["Key1"], ["Key1", "Key2"]])
# Load the table and the indexes from disk
disk_table = read("/data/indexed.parquet")
# Show that the table loaded from disk has indexes
print(has_data_index(disk_table, ["Key1"]))
print(has_data_index(disk_table, ["Key1", "Key2"]))
# Open a new liveness scope so the indexes are released after writing
with liveness_scope() as scope:
index_key1 = data_index(source, ["Key1"])
index_key1_key2 = data_index(source, ["Key1", "Key2"])
# Write to disk - indexes are automatically persisted
write(source, path="/data/indexed.parquet")
# Load the table and the indexes from disk
disk_table_new = read("/data/indexed.parquet")
# Show that the table loaded from disk has indexes
print(has_data_index(disk_table_new, ["Key1"]))
print(has_data_index(disk_table_new, ["Key1", "Key2"]))
- Log
Performance
Data indexes improve performance when used in the right context. If used in the wrong context, data indexes can make performance worse. The following list provides guidelines to consider when determining if data indexes will improve performance:
- A data index is more likely to help performance if the data within an index group is located near (in memory) other data in the group. This avoids index fragmentation and poor data access patterns.
- Reusing a data index multiple times is necessary to overcome the cost of initially computing the index.
- Generally speaking, the higher the key cardinality, the better the performance improvement.
Benchmark scripts
If you're interested in how data indexes impact query performance, the scripts below collect performance metrics for each operation. You will notice that performance improves in some instances and is worse in others.
This first script sets up the tests:
from deephaven.experimental.data_index import data_index, has_data_index
from deephaven import empty_table
from deephaven.table import Table
from typing import List, Any
from time import time
import jpy
# Disable memoization for perf testing
jpy.get_type("io.deephaven.engine.table.impl.QueryTable").setMemoizeResults(False)
def time_it(name: str, f: Callable[[], Any]) -> Any:
"""
Time a function execution.
"""
start = time()
rst = f()
end = time()
exec_time = end - start
print(f"{name} took {(exec_time):.4f} seconds.")
return rst, exec_time
def add_index(t: Table, by: List[str]):
"""
Add a data index.
By adding .table, the index calculation is forced to be now instead of when it is first used.
Args:
- t: Table to add the index to.
- by: Columns to index on.
Returns:
- The index.
"""
print(f"Adding data index: {by}")
def compute_index():
idx = data_index(t, by)
# call .table to force index computation here -- to benchmark the index creation separately -- not for production
idx.table
return idx
def run_test(n_i_keys: int, n_j_keys: int, create_idxs: list[bool]) -> Any:
"""
Run a series of performance benchmark tests.
Args:
- n_i_keys: Number of unique keys for column I.
- n_j_keys: Number of unique keys for column J.
- create_idxs: List of booleans to determine which indexes to create (I, J, and [I, J])
"""
t = empty_table(100_000_000).update(
["I = ii % n_i_keys", "J = ii % n_j_keys", "V = random()"]
)
t_lastby = t.last_by(["I", "J"])
if create_idxs[0]:
idx1_i = add_index(t, ["I"])
if create_idxs[1]:
idx1_j = add_index(t, ["J"])
if create_idxs[2]:
idx1_ij = add_index(t, ["I", "J"])
idx_ij = has_data_index(t, ["I", "J"])
idx_i = has_data_index(t, ["I"])
idx_j = has_data_index(t, ["J"])
print(f"Has index: ['I', 'J']={idx_ij} ['I']={idx_i} ['J']={idx_j}")
ret, t_where = time_it("where", lambda: t.where(["I = 3", "J = 6"]))
ret, t_countby = time_it("count_by", lambda: t.count_by("Count", ["I", "J"]))
ret, t_sumby = time_it("sum_by", lambda: t.sum_by(["I", "J"]))
ret, t_nj = time_it(
"natural_join", lambda: t.natural_join(t_lastby, ["I", "J"], "VJ = V")
)
return [t_where, t_countby, t_sumby, t_nj]
Here's a follow-up script that runs tests for both low- and high-cardinality cases, both with and without data indexes:
from deephaven.column import double_col, string_col
from deephaven import new_table
print("Low cardinality, no data indexes")
times_lc_ni = run_test(n_i_keys=100, n_j_keys=7, create_idxs=[False, False, False])
print("Low cardinality, with data indexes")
times_lc_wi = run_test(n_i_keys=100, n_j_keys=7, create_idxs=[True, True, True])
print("High cardinality, no data indexes")
times_hc_ni = run_test(n_i_keys=100, n_j_keys=997, create_idxs=[False, False, False])
print("High cardinality, with data indexes")
times_hc_wi = run_test(n_i_keys=100, n_j_keys=997, create_idxs=[True, True, True])
results = new_table(
[
string_col("Operation", ["where", "count_by", "sum_by", "natural_join"]),
double_col("TimesLowCardinalityNoIndex", times_lc_ni),
double_col("TimesLowCardinalityWithIndex", times_lc_wi),
double_col("TimesHighCardinalityNoIndex", times_hc_ni),
double_col("TimesHighCardinalityWithIndex", times_hc_wi),
]
)
It can also be helpful to plot the results:
from deephaven.plot.figure import Figure
fig = (
Figure()
.plot_cat(
series_name="Low cardinality (700 keys), no indexes.",
t=results,
category="Operation",
y="Times_LowCardinality_NoIndex",
)
.plot_cat(
series_name="Low cardinality (700 keys), with indexes.",
t=results,
category="Operation",
y="Times_LowCardinality_WithIndex",
)
.plot_cat(
series_name="High cardinality (99,700 keys), no indexes.",
t=results,
category="Operation",
y="Times_HighCardinality_NoIndex",
)
.plot_cat(
series_name="High cardinality (99,700 keys), with indexes.",
t=results,
category="Operation",
y="Times_HighCardinality_WithIndex",
)
.show()
)