How often is data forced to disk?

What is the write size of each write from the Data Import Server? How often is data forced to disk? Can these be reconfigured in properties files?

To answer this question, let's look at when and how Deephaven interacts with the OS and the filesystem. The Data Import Server (DIS) writes data to files stored to buffers in the operating system and any data written to a file and not on disk is periodically forced to durable storage. We have three internal processes, which run concurrently with one another in the same thread pool.

One routine handles the writing of data to our internal buffers, and writes those buffers to the corresponding files when they are full. It also computes metadata changes as a side effect of listening to data.

Another routine (flushing) periodically halts the writing process, writes any buffers whose contents have been modified to the corresponding files, and advises subscribed readers of the new available size. After doing so, if it's time for a new checkpoint, this same routine prepares a metadata snapshot.

The final routine takes the last prepared metadata snapshot, forces (via fsync) all written data to disk, and writes the metadata snapshot to a checkpoint record as described. The fysnc() call happens concurrently with writing and/or flushing.

Returning to the original question above, individual writes are variable-sized, with a maximum of 64KB. The minimum write size is 1 byte, which might occur from, for example, a slow-growing table that added one row to a column of booleans or bytes.

The DIS writes data to the various column files, and writes data whenever a 64KB block is filled for a given file, or when flushing. The FileHandle.writeSizeBytes stat file will provide information about the distribution of write sizes.

Flushing happens partition-wise, every 100ms by default. You can think of it as a latency budget - we ensure that we buffer no more than that interval's worth of data before writing it out and publishing a new size (in rows) for readers.

Any data written to a file and not on disk is periodically forced to durable storage. Data is forced to disk only by the checkpointing thread. Checkpointing happens partition-wise, every 30s by default. At that time, Deephaven captures the latest consistent snapshot of the partition's metadata, performs an fsync of all of the files, writes that snapshot to a new file, and moves (renames) the file to replace the previous checkpoint file (table.size).

Block size, while configurable, is "global" - it is used throughout the system and this value must be the same for all process:

  • DataBufferPool.bufferSizeInBytes - The default value is 65536. We do not recommend changing the default setting.

The properties for the flush and checkpoint intervals are:

  • DataImportStreamProcessor.default.flushIntervalMillis - The default value is 100. This property is effectively the minimum resolution for queries to see new data. The default (100ms) both ensures reasonably-low data latency and allows for some batching of writes. Increasing the value will increase the expected latency with which queries observe that new data is available. This is one of several "latency budget" parameters throughout the data pipeline that impact data latency as queries perceive it.
  • DataImportStreamProcessor.default.checkpointIntervalMillis - The default value is 30000. This property dictates the frequency of fsyncs. Writes as part of a normal flush are written without any kind of sync flags, but we use a pattern of (flush-all, record-snapshot, fsync-all, persist-snapshot) when checkpointing.

These can be overridden globally using those names, or per table with the following properties:

  • DataImportStreamProcessor.<namespace>.<table name>.flushIntervalMillis
  • DataImportStreamProcessor.<namespace>.<table name>.checkpointIntervalMillis

Unrelated to the DIS, offline imports or merges write in 256KB blocks, and should always be writing in that increment unless/until flushed at the end of a phase. This only impacts writing throughput, and need not be globally the same.