Data Import Server

The Deephaven Data Import Server (DIS) is a central agent for ingesting streaming data and serving it to queries.

The DIS consumes data from sources and transforms it into columnar data in a Deephaven-supported format. The DIS operates in two modes:

  1. Tailers connect to a DIS and send row-oriented streaming data to the DIS, which then interprets the row-oriented data and transforms it into column-oriented data. In this mode, Tailers are pushing data to one or more DIS instances. Typically, Tailers send either Deephaven format binary logs (strongly preferred) or CSV rows to the DIS.
  2. A DIS subscribes to data from streaming sources such as Kafka or Solace and writes it to Deephaven column-oriented tables. In this mode, the DIS is pulling data from the data source.

The columnar data format allows queries to lazily read just the data necessary for an operation and matches the chunk-based orientation of the Deephaven query engine. The DIS persists data locally (typically to a file system on fast SSDs), writing through an in-memory block-oriented cache.

The DIS may run as a standalone process or in the context of a Persistent Query (called an in-worker DIS). In most installations, the default db_dis process listens for Tailer connections for tables that are not otherwise configured. In-worker DISes are used for Kafka, Solace, and fast last-by ingestion. The DIS ingests and serves data according to your routing configuration and can be configured using the dhconfig dis command.

Data partitioning

Each Deephaven table is made up of one or more partitions. Within a partition, there are strict ordering guarantees, but there are no guarantees across partitions. This design allows the Data Import Server to process each partition independently without requiring any synchronization (or even being on the same machine or storage as other partitions). Query engine tools like the LeaderTableFilter or SyncTableFilter can be used to provide inter-partition consistency at the application level.

Publication and checkpointing

When the DIS processes incoming data, it appends values to a buffer for each of the table's constituent columns. Two global properties control when the DIS writes data to disk and when it makes that data durable:

  • DataImportStreamProcessor.default.flushIntervalMillis (defaulting to 100ms)
  • DataImportStreamProcessor.default.checkPointIntervalMillis (defaulting to 30 seconds)

These properties can be overridden on a per-table basis using properties of the format:

  • DataImportStreamProcessor.<namespace>.<table>.flushIntervalMillis
  • DataImportStreamProcessor.<namespace>.<table>.checkPointIntervalMillis

When the flush interval has elapsed, the in-memory buffer is written to the files that back the column and the size is advertised to subscribers. The DIS writes the data to the column files but does not force it to disk (force is the Java equivalent of fsync). The checkpoint interval controls how often data is made durable. When checkpointing, the DIS prepares a checkpoint record that contains:

  • The current size of the table in rows.
  • The size of each column file.
  • A source file record used to resume processing the table after restart.
  • Relevant import state (e.g., used for LastBy DIS processes).

After the checkpoint record is prepared, the individual column files are made durable by forcing them to disk. At this point, the checkpoint record is written to a new temporary file, forced to disk, and then renamed over the existing checkpoint record. This process ensures that the checkpoint record is updated atomically, and all data referenced by the checkpoint is durable when the rename operation completes. This atomic update supports efficient recovery in the event of system failure and guarantees consistent data access for query operations.

If there is a need to restart processing a partition, then the DIS reads the checkpoint record and truncates the column files to the appropriate sizes. The source file record contains enough information to know exactly where to resume the input stream from. For example, when ingesting a Kafka feed, the source file record contains the offset in the partition being replicated to Deephaven. When ingesting binary log files, the source file record contains the file name and byte offset corresponding to the processed row. When initializing a Tailer connection, the DIS sends the Tailer the file and offset to resume tailing from. Similarly, the import state corresponds to exactly the data at the given size of table.

Querying live data

Query workers interrogate the DIS to discover data partitions. For live tables, the workers subscribe to notifications about new partitions and size changes to the available partitions. This way, a query knows at the start of its update cycle what new data must be considered for a particular table.

The actual data is not sent in the subscription; rather, queries request blocks of raw binary (in 64KB chunks) from the DIS. These blocks represent random-access chunks from individual column files, providing two key benefits:

  1. Amortizing I/O costs over large reads.
  2. Deferring unnecessary I/O.

If a particular query DAG hosted at a worker doesn't need a specific partition, column, or block, it will never read it. All of these blocks are cached in the DIS, Table Data Cache Proxy (TDCP), or workers using the same technology, with support for repeatable reads allowing for independent cache management at each node.

A Deephaven system can have either a single DIS or multiple DIS instances. Multiple DIS instances can be used to:

  • Shard a workload for capacity management.
  • Provide failure isolation.
  • Act as hot replicas to support failover configurations.

Most often, queries do not directly connect to the DIS to request data, but rather connect through a TDCP. This architecture reduces the number of subscriptions to the DIS from potentially thousands of queries to tens of TDCP instances that can amortize reads across many queries.

The Data Routing Configuration controls where Tailers send data and where queries request data from.

Understanding these concepts is essential for both administrators and users working with tables in the Deephaven environment.

Risks of modifying live data

Deleting data or changing table definitions while queries are using that data can lead to errors.

Best practices

  1. Verify Usage Before Modification:

    • Check if any active queries are using the table before making changes.
    • Review query dependencies to understand potential impact.
  2. Scheduled Maintenance Windows:

    • Plan data structure changes during low-usage periods.
    • Notify users before making significant changes.
  3. Versioning Approach:

    • Instead of modifying existing tables, consider creating new ones with updated schemas.
    • Implement a transition plan for users to move from old to new data structures.

Recovery procedures

If errors occur after modifying live data:

  1. Restart Affected Queries:

    • The most likely solution is to restart queries that were using the modified data.
    • This forces the query to reload the table with its new structure or content.
    • See the documentation on Managing Persistent Queries for restart procedures.
  2. Service Restart:

    • In severe cases, restarting specific services may be necessary.
    • The DIS and/or TDCP can be restarted independently. To reduce risk, first restart the TDCP. If the issue is not resolved, then restart the DIS.
    • Note that even in non-production environments, this requires coordination with system administrators.
  3. Data Reconstruction:

    • If data was accidentally deleted, it may need to be reloaded from the original source.
    • Use backup systems when available to restore lost partitions.
  4. CLI Intervention:

    • In cases of data corruption, command line tools may be needed to delete corrupted table data.
    • Use the dhctl utility for advanced table management operations.
    • This is typically a last resort and requires administrator privileges.