Integrating Deephaven with Custom Data Sources
Deephaven includes a proprietary columnar store for persistent historical and intraday data. However, customers may integrate their own data stores into the Deephaven system to make use of our efficient engine, analytics and data visualization. In this document, we describe three integrations with alternative data sources:
- In-memory static sources (as implemented by TableTools.readCsv and readJdbc)
- In-memory dynamic tables (e.g., a multicast distribution system)
- Lazily-loaded on disk tables (e.g., our Apache Parquet integration)
Each Deephaven table is made up of a data structure called an Index and named ColumnSources (represented as a Java map from String to ColumnSource). The column sources can be thought of as parallel dictionaries from a long key to values for a given row. The Index is an ordered set of longs which represent the valid address space (keys) to read from a column source.
To construct a table suitable for use by Deephaven engine operations, you must construct a com.illumon.iris.db.v2.QueryTable
by passing in an Index and Map<String, ColumnSource>
. We recommend using a LinkedHashMap to preserve column order.
Static In-Memory Tables
For a simple static source, the most convenient way to create the table is with a flat address space. The Index that includes keys 0 to size-1, is created by calling com.illumon.iris.db.v2.utils.Index.Factory#getFlatIndex
with the desired table size. After reading your data into primitive or typed arrays, the simple flat column sources can be created with the com.illumon.iris.db.v2.sources.ArrayBackedColumnSource#getMemoryColumnSource(java.lang.Object)
function, which determines the type of your result column based on the input array type.
Dynamic In-Memory Tables
Deephaven customers have also integrated more complex ticking sources into in-memory tables. In one example, a customer has connected their internal last-reliable multicast distribution system directly into the Deephaven remote query processor. This allows them to access the high-volume feed of latest prices, positions, and other key data without the latency or cost of writing it to disk.
These tables are created using the same ArrayBackedColumn
sources as used for static tables, but instead of writing the data once and leaving it unchanged, the system registers a LiveTable that polls for newly updated entities on each Deephaven update cycle. The system then sets values in the ArrayBackedColumnSource
and produces an update message indicating which Index keys have been added, deleted, or modified within the table. The message is sent to downstream query operations using the DynamicTable notifyListeners
call. Using the changed keys, downstream operations can incrementally recompute their result.
Deephaven ColumnSources
are designed to track both current and previous values. Previous values are defined to be the value of the key at the start of an update cycle. After the startTrackingPrevValues
method is called, the ArrayBackedColumnSources
automatically track previous values whenever a value is changed. For example, a sumBy
can be incrementally updated by reading the previous value, subtracting it from the summation, and then reading the current (new) value and adding it to the summation.
Deephaven tables are either “coalesced” or “uncoalesced”. An uncoalesced table references data but may not be used for most query engine operations. To prepare a table for use with the query engine, it must be coalesced (turned into a QueryTable
with an Index and ColumnSources
). When using the Deephaven APIs, when you request a table from disk, before you apply a partitioning filter (e.g., “Date=lastBusinessDateNy()”
) the table is uncoalesced and the Index has not yet been determined. This allows the Table to be filtered to partitions without the need to determine the complete Index for the table before it is created. After the partitioning filter is applied, the table is coalesced into a query table and ready for use by most engine operations. If an engine operation is applied before the table is coalesced, then it is coalesced automatically.
For their multicast tables, our customer also creates them in an uncoalesced state. Only after the table is filtered to relevant securities are the multicast subscriptions requested and a QueryTable created.
On Disk Tables
Using in-memory tables greatly limits the size of the data that can be processed with Deephaven. All columns and all rows must be read into memory before any query operations are available. The preferred way of working with large data sets is to allow Deephaven to load them lazily, meaning that data is only loaded when it is accessed. This can be accomplished by writing implementations of the ColumnSource
interface that do not read their data when the Table is created, but rather on demand when the engine requires access to the data either through the get()
or fillChunk()
calls.
The get()
family of calls provides access to single cells of the data, and is implemented for each primitive type (getByte
, getShort
, getInt
, getLong
, getChar
, getFloat
, getDouble
) to avoid boxing. The fillChunk
call allows the ColumnSource
to more efficiently fetch many cells of data in bulk; the ColumnSource
’s default implementation delegates to the appropriate primitive get. On-disk data in Deephaven is typically immutable, so the previous-value calls can simply be delegated to get calls. To produce an efficient implementation, data should be cached so that it need not be read from disk on each access.
Deephaven ColumnSources
may provide support for grouping information. Grouping information is analogous to an index in a traditional database; in-memory groups are represented by a Map from key to a Deephaven Index of keys. In our proprietary format, all members of a group are stored contiguously on disk with an ancillary file containing the start of each group. When the getGroupToRange
method is called, the ancillary data is read to produce an appropriate map. If your underlying data exposes grouping information, the query engine can use the grouping information for operations such as filtering or bucketing (as in a join
) rather than reading each individual data value.
Deephaven v1.20190816 integrates support for reading historical data from Apache Parquet files. To ensure performant access, our implementation allows discovery of partitions before coalescing the table, uses a buffer cache managed by SoftReferences
so that data need not be re-read after it is first accessed, and supports reading grouping information from disk.