Indexing Intraday Partitions
Deephaven intraday user tables are optimized for fast ingestion speed and enabling efficient columnar access to the entire dataset. For queries where only the most recent data is necessary, the last-by DIS maintains an index for a single-row per key. When intraday data is merged into historical tables, it is rearranged to provide for very efficient grouped access and extensive indexing is available.
Often data sources have a natural partitioning, for example by symbol or account. Taking advantage of this partitioning can enable workload sharding and improve the performance of queries when there is prior knowledge of the data layout. However, it may not be possible to identify the internal partition or partitions for a given set of data a priori. Certain tables, notably the internal performance and troubleshooting log tables, grow rapidly and are frequently the subject of complex global queries. To provide for optimization of queries of this kind we have introduced an indexing paradigm for intraday partitions.
Overview
Intraday indexing is implemented on a per-table basis in two steps. First an ImportStatePartitionIndex
object is associated with the base table and writes unique rows to an associated index table. Then selected queries against the base table are modified to consume the index table and use it to filter the internal partition set over which the query is computed, potentially saving a substantial amount of time and system resources.
Automated use of the index is not implemented at this time.
The index table is named by appending the word Index
to the name of the base table and is in the same namespace as the base table. Its columns are described in the table below.
Column Names | Type | Description |
---|---|---|
InternalPartition | String | The internal partition identifier for a subset of the table data. |
Base table partitioning columns, usually Date | Same as base table. | Values of the partitioning column(s) in the base table that map into this InternalPartition . |
Other base table column names. | Same as base table. | Values from other Key column values important in relevant queries that appear in this parition. |
Base table schema changes
The base table's schema must be modified to include the indexing ImportState
object and pass it to the Key
column(s) needed. An ImportState
element is added to the active LoggerListener
block. Changing the ImportState
's type results in errors if you resume writing an existing partition with the new schema. If you had an existing ImportState
, then you should update your logFormat
and suffix your internal partitions with log format. If you had no existing ImportState
, then you can use your existing log format, and need not update the producers. Alternatively, you can remove partitions that would have an older ImportState
after updating the schema.
Here is the opening portion of the active LoggerListener
element in DbInternal.ProcessEventLog.schema
:
<LoggerListener logFormat="2" loggerPackage="com.illumon.iris.db.gen"
loggerClass="ProcessEventLogFormat2Logger"
listenerPackage="com.illumon.iris.db.gen"
listenerClass="ProcessEventLogFormat2Listener"
rethrowLoggerExceptionsAsIOExceptions="false">
<ImportState stateUpdateCall="newRow(Process,ProcessInfoId,AuthenticatedUser,EffectiveUser)"
importStateType="com.illumon.iris.db.tables.dataimport.logtailer.ImportStatePartitionIndex" />
<SystemInput name="timestamp" type="long" />
<SystemInput name="host" type="String" />
...
The name of the index table is constructed internally by the ImportStatePartitionIndex
object and the partitioning column(s) of the base table are internally captured so they do not appear in the stateUpdateCall
syntax.
Index table schema
The ImportStatePartitionIndex
object internally invokes the active loggerClass
from the index table, found by adding Index
to the base table's name. The index table should have:
- a
String
column namedInternalPartition
. - partitioning columns identical in name and type to those of the base table.
- a column matching name and type for each
Key
column appearing in thestateUpdateCall
specification in the base table schema.
Here is the entire schema file DbInternal.ProcessEventLogIndex.schema
:
<Table name="ProcessEventLogIndex" namespace="DbInternal" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
<Partitions keyFormula="${autobalance_by_first_grouping_column}" />
<Column name="Date" dataType="String" columnType="Partitioning" />
<Column name="InternalPartition" dataType="String" />
<Column name="Process" dataType="String" />
<Column name="ProcessInfoId" dataType="String" />
<Column name="AuthenticatedUser" dataType="String" />
<Column name="EffectiveUser" dataType="String" />
<LoggerListener logFormat="1" loggerClass="ProcessEventLogIndexLogger" loggerPackage="com.illumon.iris.db.gen" rethrowLoggerExceptionsAsIOExceptions="false" tableLogger="false" generateLogCalls="true" verifyChecksum="true" loggerLanguage="JAVA" listenerClass="ProcessEventLogIndexListener" listenerPackage="com.illumon.iris.db.gen">
<SystemInput name="InternalPartition" type="java.lang.String" />
<SystemInput name="Tuple" type="com.illumon.iris.db.util.tuples.ArrayTuple" />
<LoggerImports>import com.illumon.iris.db.util.tuples.ArrayTuple;</LoggerImports>
<Column name="InternalPartition" dataType="java.lang.String" />
<Column name="Process" dataType="java.lang.String" intradaySetter="(String)(((ArrayTuple)Tuple).getElement(0))" />
<Column name="ProcessInfoId" dataType="java.lang.String" intradaySetter="(String)(((ArrayTuple)Tuple).getElement(1))" />
<Column name="AuthenticatedUser" dataType="java.lang.String" intradaySetter="(String)(((ArrayTuple)Tuple).getElement(2))" />
<Column name="EffectiveUser" dataType="java.lang.String" intradaySetter="(String)(((ArrayTuple)Tuple).getElement(3))" />
</LoggerListener>
</Table>
Note that the intradaySetter
syntax may vary depending on the number and type of the columns; please use the generator class.
Schema Modification Utility Class
The initial set of indexing schema changes for DbInternal
tables were implemented using a generator object created for the purpose and available for customer use. We suggest using it to generate the necessary syntax for new or updated schemas even if the circumstances require manual editing.
The generator operates against writeable schema files in a directory structure, which may be under a source control system such as git
. Schema files can be exported from and deployed back to a live Deephaven system using dhconfig schema export and import.
...path-to-/folder/
DbInternal/
DbInternal.ProcessEventLog.schema
...
MyNamespace/
MyNamespace.MyBigTable.schema
...
The path to the top of this tree is passed to the constructor for a com.illumon.iris.db.tables.dataimport.logtailer.InternalPartitionIndexSchemaFactory
object. That object can then be used to simultaneously inject the ImportState
element into the base table schema and create a new index table schema. The following example is based on the Java code used to index the DbInternal
logging tables:
final InternalPartitionIndexSchemaFactory factory =
new InternalPartitionIndexSchemaFactory("/path/to/folder/");
factory.makeIndexSchema("DbInternal", "ProcessEventLog", "Process", "ProcessInfoId", "AuthenticatedUser", "EffectiveUser");
factory.makeIndexSchema("MyNamespace", "MyBigTable", "SomeColumn", "AnotherColumn");
Consuming partition index data
In outline, one may query the index table to find the set of InternalPartition
values that may contain relevant rows. The query on the base table can then start with a Table
object spanning only those partitions; any subsequent operations on the filtered table will be more efficient than if applied to the full table. The form of getIntradayTable
to use is documented in the Database interface Javadoc.
Note that although the filtered table object is "live", there is not a live connection to the index query result set. New rows that are not in the pre-selected partition set will not appear in the optimized query results.
In this Legacy example, using the index saves about 95% of the disk I/O required to find the desired information:
processOfInterest = "8055f179-8616-432e-bace-06addec632a0"
fullProcessEventLog = db.i("DbInternal", "ProcessEventLog").where("Date=currentDateNy()")
pelIndex = db.i("DbInternal", "ProcessEventLogIndex").where("Date=currentDateNy()")
internalPartitions = pelIndex.where("ProcessInfoId = processOfInterest").selectDistinct("InternalPartition").getColumn("InternalPartition").getDirect()
relevantPartitions = db.i("DbInternal", "ProcessEventLog",(Set<String>) internalPartitions)
println fullProcessEventLog.size()
// On one test environment this returned 623,875 rows
println relevantPartitions.size()
// And the relevant partitions contained 18,555 rows
completeFiltered = fullProcessEventLog.where("ProcessInfoId = processOfInterest")
relevantFiltered = relevantPartitions.where("ProcessInfoId = processOfInterest")
println completeFiltered.size()
// Filtering the entire table produced 541.
println relevantFiltered.size()
// The relevant partitions produced the same 541 rows.