Data Import Server

The Deephaven Data Import Server (DIS) is a central agent for ingesting streaming data and then serving it to queries.

The DIS is responsible for consuming data from sources and transforming it to columnar data in a Deephaven-supported format. The DIS operates in two modes:

  1. Tailers connect to a DIS and send row-oriented streaming data to the DIS, which then interprets the row-oriented data and transforms it into column-oriented data. In this mode, Tailers are pushing data to one or more DIS instances. Typically, tailers send either Deephaven format binary logs (strongly preferred) or CSV rows to the DIS.
  2. A DIS subscribes to data from streaming sources such as Kafka or Solace and writes it to Deephaven column-oriented tables. In this mode, the DIS is pulling data from the data source.

The columnar data format allows queries to lazily read just the data necessary for an operation and matches the chunk-based orientation of the Deephaven query engine. The DIS persists data locally (typically to a file system on fast SSDs), writing through an in-memory block-oriented cache.

The DIS may run as a standalone process or in the context of a Persistent Query (called an in-worker DIS). In most installations, the default db_dis process listens for tailer connections for tables that are not otherwise configured. In-worker DISes are used for Kafka, Solace, and fast last-by ingestion. The DIS is responsible for ingesting and serving data according to your routing configuration. DISes can be configured using the dhconfig dis command.

Data Partitioning

Each Deephaven table is made up of one or more partitions. Within a partition, there are strict ordering guarantees, but there are no guarantees across partitions. This means that the Data Import Server can process each partition independently without requiring any synchronization (or even being on the same machine or storage as other partitions). Query engine tools like the LeaderTableFilter or SyncTableFilter can be used to provide inter-partition consistency at the application level.

Publication and Checkpointing

When the DIS processes incoming data, it appends values to a buffer for each of the table's constituent columns. There are two global properties that control when the DIS writes data to disk and when it then makes it durable:

  • DataImportStreamProcessor.default.flushIntervalMillis (defaulting to 100ms)
  • DataImportStreamProcessor.default.checkPointIntervalMillis (defaulting to 30 seconds)

These properties can be overridden on a per-table basis using properties of the format:

  • DataImportStreamProcessor.<namespace>.<table>.flushIntervalMillis
  • DataImportStreamProcessor.<namespace>.<table>.checkPointIntervalMillis

When the flush interval has elapsed, the in-memory buffer is written to the files that back the column and the size is advertised to subscribers. The DIS writes the data to the column files but does not force it to disk (force is the Java equivalent of fsync). The checkpoint interval controls how often data is made durable. When checkpointing, the DIS prepares a checkpoint record that contains:

  • The current size of the table in rows.
  • The size of each column file.
  • A source file record used to resume processing the table after restart.
  • Relevant import state (e.g., used for LastBy DIS processes).

After the checkpoint record is prepared, the individual column files are made durable by forcing them to disk. At this point, the checkpoint record is written to a new temporary file, forced to disk, and then renamed over the existing checkpoint record. This process ensures that the checkpoint record is updated atomically, and all data referenced by the checkpoint record has been made durable.

If there is a need to restart processing a partition, then the DIS reads the checkpoint record and truncates the column files to the appropriate sizes. The source file record contains enough information to know exactly where to resume the input stream from. For example, when ingesting a Kafka feed, the source file record contains the offset in the partition being replicated to Deephaven. When ingesting binary log files, the source file record contains the file name and byte offset corresponding to the processed row. When initializing a tailer connection, the DIS sends the tailer the file and offset to resume tailing from. Similarly, the import state corresponds to exactly the data at the given size of table.

Querying Data

Query workers interrogate the DIS to discover data partitions. For live tables, the workers subscribe to notifications about new partitions and size changes to the available partitions. This way, a query knows at the start of its update cycle what new data must be considered for a particular table.

The actual data is not sent in the subscription; rather, queries request blocks of raw binary (in 64KB chunks) from the DIS. These blocks represent random-access chunks from individual column files, allowing a reasonable trade-off between (1) amortizing I/O costs over large reads and (2) deferring unnecessary I/O. If a particular query DAG hosted at a worker doesn’t need a particular partition, column, or block, it will never read it. All of these blocks are cached in the DIS, Table Data Cache Proxy (TDCP), or workers using the same technology, with support for repeatable reads allowing for independent cache management at each node.

A Deephaven system can have either a single DIS or multiple DIS instances. Multiple DIS instances can be used to shard a workload for capacity management or failure isolation, or they can be configured as hot replicas to support failover. Most often, queries do not directly connect to the DIS to request data, but rather connect through a TDCP. This reduces the cardinality of subscriptions to the DIS from potentially thousands of queries to tens of TDCP instances that can amortize reads across many queries.

The Data Routing Configuration controls where tailers send data and where queries request data from.