We built Deephaven to be an incredible tool for working with tabular data -- full stop. To us, tables are dynamic, powerful constructs, but we certainly care about static, batch ones too. In this piece, we explore some of the underlying technical and architectural decisions that, taken together, deliver Deephaven’s value proposition.
Table Update Model
Deephaven’s technology focuses on real-time, incrementally-updating data as a paramount goal and core competency. To this end, we have designed and implemented a directed acyclic graph (DAG) based model for communicating and reacting to incrementally-changing tabular data. DAGs form a logical path of dependent vertices and edges, such that following the direction of the connections will never form a closed loop. In Deephaven, the DAG’s vertices model tables or similar data artifacts, while edges model listeners and subscriptions. This DAG can be extended across multiple threads or over the network across multiple processes. It ensures data remains internally consistent across multiple input sources and their derived tables via a logical clock to mark phase and step changes. You can read more about our approach here.
Unified Table API Design
We know that many use cases require analyzing and commingling static and dynamic data. We don’t think users should have to use different tools or APIs for these two types of sources, for two reasons:
- Learning curve: Learning two APIs may take (at least) twice as long as one. Further, there is often nagging cognitive dissonance in trying to remember how the same operation is done in each.
- Ease of data integration: Merging or joining data accessible via different systems often requires using a third tool for data federation or implementing a custom tool. This is never seamless and sometimes requires intermediation.
At Deephaven, we have designed and implemented a unified table API that offers the same functionality and semantics for both static and dynamic data sources, albeit with additional correctness considerations in the dynamic case. Deephaven tables that incorporate real-time data update at a configurable frequency (usually 10-100 milliseconds), always presenting a correct, consistent view of data to downstream consumers in the update graph, as well as to external listeners and data extraction tools, whether in-process or remote.
Lambda architecture without compromises
Deephaven has long embraced the idea of lambda architecture in its products. Explained simply, this refers to data system deployments that layer batch data, real-time data, and a serving layer in order to allow different classes of data to be handled optimally while still producing integrated results. This class of architecture allows data system engineers to control the trade-offs they make between latency, throughput, resource utilization, and fault tolerance.
One classic use case can be found in capital markets’ trading systems, with previous days’ historical data that can be treated as static managed separately from intraday data that is still growing and evolving. It’s often necessary to bootstrap models over historical datasets and then extrapolate with intraday data as it arrives. Another applicable example can be found in infrastructure monitoring systems that aggregate data hierarchically across many nodes; older data may be batch-processed and aggregated at coarse granularity, while fresh data may be in its most raw form for minimum latency.
In this regard, our enterprise product institutionalizes this model for our customers, providing a complete suite of tools for publishing persistent, append-only, immutable data streams as tables and distributing this data widely to many query engines hosted within remote applications, along with tooling for data lifecycle management and data-driven application orchestration. Deephaven Enterprise explicitly allows for historical data to be post-processed, stored, and served in an appropriate form to maximize throughput for common queries, while allowing minimum latency for raw, real-time data, and integrates these within its own distributed data service layer.
Our community software empowers users with a comprehensive, extensible query engine that can interact with and commingle data accessed or ingested directly from disparate sources, including custom applications, Apache Parquet, Apache Kafka, Apache Orc, CSV, JSON, CDC, and in the future, Arrow, ODBC, and other contemplated integrations. Our table update model and unified table API design allow for seamless integration of data with different characteristics, thus allowing batch data and real-time data to coexist behind the same interface. Layering our gRPC API on top completes the picture, linking Deephaven query engines with one another or with client applications to construct data-driven applications.
This combination of capabilities allows Deephaven to serve as the only necessary tool for all layers in a lambda architecture deployment, solving many of the challenges inherent in what is normally considered a complex design. This, in turn allows for a better overall experience for end-users, as the data manipulation and access APIs are consistent and realize the full capabilities of all layers.
Tables designed for sharing and updating
Deephaven tables are implemented using data structures that lend themselves to efficient sharing and incremental updating. At its most basic, each table consistents of two components:
- A RowSet that enumerates the possibly-sparse sequence of row keys (non-negative 64-bit integers) in the table.
- A map associating names with ColumnSources that allow data retrieval for individual row keys, either element-by-element or in larger batches.
A ColumnSource represents a column of (possibly dynamically updating) data that may be shared by multiple tables.
A RowSet selects elements of that ColumnSource and might represent all the data in the ColumnSource or just some subset of it. A redirecting RowSet is a RowSet that is derived from another RowSet and which remaps its keys. It can be used, for example, to effectively reorder the rows in a table.
where operations) creates a new RowSet that is a subset of an existing RowSet; sorting creates a redirecting RowSet.
A table may share its RowSet with any other table in its update graph that contains the same row keys. A single parent table is responsible for maintaining the RowSet on behalf of itself and any descendants that inherited its RowSet. Table operations that inherit RowSets include column projection and derivation operations like
view, as well as some join operations with respect to the left input table; e.g.,
exact join, and
A table may share any of its ColumnSources with any other table in its update graph that uses the same row key space or whose row key space can be redirected onto the same row key space. A single parent table is responsible for updating the contents of a given ColumnSource for itself and any descendants that inherited that ColumnSource. Operations that allow this sharing without redirection include
where, as well as any column transformation that passes through columns unchanged or simply renamed, and some join operations with respect to the left-hand table’s ColumnSources. Operations that allow ColumnSource sharing with redirection include
sort, as well as most joins with respect to the right-hand table’s ColumnSources.
By itself, this sharing capability represents an important optimization that avoids some data processing or copying work. When considered in an updating query engine, it should be clear that this avoidance extends to each update cycle, paying dividends for the lifetime of a query.
Furthermore, the possible sparsity of the RowSet’s row key space allows for greatly reduced data movement within the RowSet itself and the ColumnSources it addresses. This is essential for the performance of Deephaven’s incremental sort operation, as well as in many cases when source tables publish changes that are more complex than simple append-only growth; e.g., multiple independently-growing partitions, or tabular representations of key-value store state.
“You don’t have to be an engineer to be a racing driver, but you do have to have Mechanical Sympathy.”
– Jackie Stewart, racing driver
This section heading and quote might be better explained by one of our favorite blogs. As systems programmers, we know that it’s best to use our tools (JVM, host OS, underlying I/O subsystems, CPU, etc) in the way that they were designed to be used most effectively. We’ve optimized our query engine with this principle in mind, although this remains and always will remain a work in progress.
Deephaven’s approach to mechanical sympathy can be summarized with a few key observations:
- Aggregations and operations are often best served at scale through vertical structures. Deephaven’s column orientation provides flexibility and performance.
- Data movement can have high fixed costs. It’s often best to amortize those costs over many rows/cells/bytes in order to achieve higher throughput. There are limiting factors, however; fetching more data than will be used has its own costs (“read amplification”), and working with overly large blocks of data can cause poor locality of reference and cache performance. Deephaven uses block-oriented data reads and caches, with block sizes chosen to strike a balance between these competing factors.
- “Predictable” code (meaning, not likely to mislead the branch predictor) is easier for the CPU to optimize. Deephaven’s engine code is often structured with relatively few branches, especially the inner code within a loop. Our generated code typically prunes unreachable cases.
- JIT compilation in modern JVMs can work wonders for performance if you let it. We try to help it along by:
- Avoiding virtual method invocations that can’t trivially be inlined, especially in inner loops.
- Operating on dense regions of memory in order to allow for vectorization when the CPU offers such APIs. Note that we intend to pursue the explicit vectorization support being added to the JVM (https://openjdk.java.net/jeps/338, https://openjdk.java.net/jeps/414) when our development roadmap permits.
- Garbage collections can result in serious performance degradation by consuming CPU resources that might otherwise be available for application throughput and by triggering unpredictable slowdowns or pauses at performance-critical times. Deephaven uses preallocation and object pooling to lower overall young generation usage. To the extent possible, and particularly within our inner loops, we try to use reusable objects rather than allocating many temporaries.
- Concurrency can be a two-edged sword. Poorly designed concurrent systems can be slower than equivalent single-threaded implementations due to resource contention, false sharing, thrashing, etc. Deephaven tries to avoid shared state wherever possible, and focuses on providing users with the tools to parallelize their workloads along natural partitioning boundaries.
The Deephaven query engine moves data around using a data structure called a Chunk. This subsystem is a key aspect of the way we achieve mechanical sympathy in our implementation.
By working with chunks of data rather than single cells, we allow the engine to amortize data movement costs at every applicable level of the stack. For example, ColumnSources are ChunkSources, allowing bulk
fillChunk data transfers. These data transfers may in turn be implemented by wrapping or copying arrays, by reading the appropriate region of a file, or by evaluating a formula once for each result element.
By structuring our engine operations as chunk-oriented kernels, we allow the JVM’s JIT compiler to vectorize computations where possible.
Chunks are strongly-typed by the storage they represent; as such there is an implementation for each Java primitive type and an additional one for Java Objects, and a separate WritableChunk hierarchy for chunks that may be mutated. They are parameterized by a generic Attribute argument that allows compile-time type safety according to the kind of data stored in a chunk; e.g., values, ordered row keys, hash codes, etc, without runtime cost.
The Chunk type hierarchy is designed such that cellular data access is never performed using virtual method calls; cell accessors like
add are always
final and should be easy to inline. Only bulky methods like
copy tend to be truly virtual, with multiple implementations.
Chunks are pooled in order to allow re-use as temporary data buffers without significant impact on garbage generation, thus reducing garbage collection frequency and duration.
Chunks are currently implemented as regions of native Java arrays, with implementations for each primitive type as well as an additional one for Object references. We’ve long contemplated switching to use Apache Arrow ValueVectors (called Arrays in the C++ implementation), but haven’t yet invested sufficient development resources to build a proof of concept.
Columnar Hash Tables
Building further on our chunk-oriented engine design, Deephaven join and aggregation operations frequently rely on a columnar hash table implementation that allows chunks of single or multi-element keys to be hashed and inserted or probed in bulk operations where data patterns permit. Each component of this operational building block attempts to enable vectorization and avoid virtual method lookups by relying on chunk-oriented kernels, whether for hashing data, sorting data, looking for runs of identical values, and so on.
Another area of the Deephaven query engine that merits specific attention is our RowSet implementation. In practice, RowSets switch between a number of implementation options depending on size and sparsity. Currently, there are three in use:
- Single Range: Exactly what it sounds like, ideal when all row keys in a RowSet are contiguous.
- Sorted Ranges: Like single range, but more than one, and in sorted order. Ideal for tables with a small number of contiguous ranges, no matter how widely separated.
- Regular Space Partitioning: Our take on the popular Roaring Bitmaps (RB) data structure, but with substantial optimizations to allow for widely distributed row key ranges. We use a library of containers initially derived from the Java RB implementation, with a few special case containers added to allow for a smaller memory footprint in important edge cases. These containers are organized in an array as in RB, but ranges of containers that would be entirely full are stored as a single entry in a parallel array.
In all cases, the goal is to allow for efficient traversal or set operations with minimum storage cost and high throughput. This is crucial, as every table operation within Deephaven relies in part on the performance characteristics of these data structures.
Formula and condition evaluation
Deephaven table operations often support complex, user-defined expressions for creating columns (“formulas”) or filtering (“conditions”). We’ve taken several key design decisions that either enable interesting use cases or allow for significant optimization.
- Simple pre-compiled Java class instances.
- New Java classes that are subsequently compiled, loaded, and instantiated.
- Numba-compiled machine code.
Given these options, simple expressions can be explicitly optimized and avoid any compilation overhead. Complex expressions, on the other hand, allow for a wide degree of latitude in method calls, conditionality, and positional column access.
Part of this process replaces column name references with parameters that can then be supplied from data loaded efficiently in chunks. It also allows for columns to be accessed as logical arrays in row position space.
Moreover, the expression parser also allows for named parameter substitution, which is valuable in that it allows users to write more readable queries, but also allows for complex, compiled expressions to be reused with different inputs and not recompiled.
Integrating user methods
One critical aspect of complex formula or condition evaluation is the ability to apply any user function that can be callable via Java, either directly or via JNI. This allows for users to “extend” the query language with their own functionality by integrating external models and algorithms into their queries. This, in turn, enables all manner of exciting integrations with data science toolkits, whether open source or proprietary.
There is a lot more we can do in this area, especially considering some of the tools that have matured in recent years. We’re especially interested in exploring alternative bytecode-generation strategies and integrating GraalVM as a compilation option.
Bridging Java and Python: JPY
Although the core of Deephaven is implemented in Java, we consider Python to be the most important language in use for writing Deephaven queries. We have invested significantly in a fork of the open source JPY project, and are currently working with the project’s contributors to evolve capabilities. Written in C with JNI and Python C APIs, JPY is a performant, low level library.
To maximize the familiarity of the Deephaven data science and app-dev experience, we have developed a transparent, pure Python API that relieves the user from the details of calling JPY. During the design and implementation of this Pythonic API, special care was taken to minimize the number of crossings between JNI and Python runtime.
gRPC APIs for polyglot interoperability
Deephaven’s core API is implemented using polyglot technologies that allow for compatible client (or server!) implementations in almost any language. It is composed of several complementary modules, but its Arrow Flight service and Table service are foremost. These offer high-performance data transport -- specifically organized to include real-time and updating data -- and a table manipulation API that mirrors the Deephaven engine’s internal compute paradigm. You can read more about our API itself here.
Distributing DAGs and global consistency
At Deephaven, we believe that our approach to propagating static and updating tabular data will revolutionize distributed data systems development. It represents a powerful new model.
As touched upon briefly earlier in this piece, the Deephaven query engine propagates updates concurrently via a DAG, relying on a logical clock to mark phase and step changes for internal consistency. While this sort of coordination is suitable within a single process, the overhead increases exponentially when extending such a DAG across multiple processes.
Based on this observation, we’ve implemented a design for multi-process data-driven applications that relies on consistent table replication using initial snapshots followed by subsequent deltas. This allows nodes to operate with their logical clocks mutually decoupled, allowing truly parallel update propagation. This also allows for bidirectional data flows, with nodes that publish a given table able to act as consumers for other tables.
This approach intentionally trades away “global consistency” for increased throughput and scalability. In practice, we think that such a global view is either illusory or better implemented via end-to-end sequence numbers that allow for data correlation within the query engine. By illusory we mean to observe that input sources often publish in a mutually-asynchronous manner, thus constraining the possibilities for true consistency to something narrower; e.g., “mutual consistency based on the inputs observed at a given point in time.” For data sources that do contain correlatable sequence numbers, Deephaven offers tools for synchronizing table views to reconstruct a truly consistent state.
Interacting with data without compromises
Deephaven at its core is designed to handle big ticking data. The web-client-ui front-end is no exception. We want to be able to interact with as large a ticking data set as possible without compromising the user experience. This includes displaying a grid full of all the data, and plotting the data.
Displaying a grid
In the web browser, we have some unique challenges when trying to display a large ticking set of data. When researching existing grid data solutions, we discovered that all of them had compromises when working with large data sets: the experience would either be completely different than a small data set (requiring paging buttons or some other mechanism to switch between small data sets), performance would be poor to the point of unusable, or they simply would not work at all. We wanted the same experience for data sets of any size, so we ended up developing our own front-end grid component that performs the same for all data sets.
When plotting large data sets, the biggest challenge is allowing interactivity with the data. A user could zoom out and view the entire data set from a high-level. With data sets with millions of points, it’s extremely inefficient to send the entire data set from server to client; even more so when ticking data is involved. With static data you can process the plot server side and simply send an image representation of the full plot, but that becomes inefficient when data is updating frequently, and also compromises the interactivity. We developed a downsampling algorithm that accurately represents the data with the current viewport/zoom-level set, efficiently updates as new data comes in, and allows the user to continue interacting with the plot naturally.
The whole is greater than….
Deephaven is the result of evolution. Better stated, Deephaven’s architecture is the effect of requests and feedback from sophisticated users coupled with the R&D efforts of a dedicated development team over many years. Compelling problem sets deserve prolonged attention, and servicing real-time app-dev and data science use cases certainly qualifies.
Deephaven Community Core is specifically designed, delivered, and packaged to be modular. This is both consistent with modern best practices and strategies to maximize its software’s extensibility and value to the community it serves. In exploring the components of its value proposition, we hope you conclude that taken together, Deephaven offers a compelling stack for your data-driven use case. Sometimes range matters. And range is a Deephaven superpower.