Log Aggregator Service
The Log Aggregator Service (LAS) is part of Deephaven's streaming data ingestion pipeline, working alongside components like loggers, tailers, and the Data Import Server (DIS).
The LAS is a Deephaven process that combines binary log entries from several processes and writes them into combined binary log files. This reduces the total number of log files and internal partition values, and enables order and grouping guarantees on data from multiple sources. For example, all worker processes on a query server typically write process event log entries to the local LAS, and the data is combined into a single internal partition for the query server. The LAS combines input for each data stream (defined by the 5-tuple namespace, table name, table type, column partition value, and internal partition value), and writes binary rows to a single output stream. The output stream is a series of binary log files with timestamps that roll over periodically.
Loggers may write directly to log files or via the LAS.
By default, all nodes in a Deephaven cluster run a LAS. One of these LAS instances will be designated as the destination for central user table updates.
Data routing
When a logger uses the log aggregator, it determines the appropriate LAS instance and address via the data routing service, as configured in the log aggregator servers section. This is generally a shared LAS for centrally managed user data, and an instance on the local system for other data.
Client configuration
The use of the LAS is controlled by the useLogAggregatorService
property. By default, this property is set to false
except for query workers.
When useLogAggregatorService
is set to true
, audit event logs and process event logs will be written through the LAS. This setting must be true
if multiple instances of the same class will run on a server, and they don't define their own internal partitions.
The configuration can be defined at different levels using property prefixes, as described in Metrics and monitoring.
Server configuration
A LAS process loads the data routing configuration matching its LogAggregatorService.routingConfigurationName
or property.name
property.
Properties
The following properties configure the behavior of the LAS and must be configured in the properties map of the Log Aggregator Server configuration.
binaryLogTimeZone
: Defines a time zone ID to be used to create binary log filenames.
The LAS allocates an array of message pools with a range of sizes for processing incoming data. The following properties govern the amount of memory that is pre-allocated (see Message pool below):
messagePool.minBufferSize
: Default = 1024 bytesmessagePool.maxBufferSize
: Default = propertyBinaryStoreMaxEntrySize
(1Mb)messagePool.bucketSize
: Default = 1000
The LAS combines input for all data streams with the same defining 5-tuple, and writes binary rows to a queue that drains to the output file stream. The following properties influence the behavior of the write queue for a single stream:
BinaryLogQueueSink.queueSize
: Maximum number of rows staged for writing. This limits how far incoming clients can get ahead of file output. The default is 1000 rows.BinaryLogQueueSink.idleShutdownTimeoutMs
: The amount of time until the queue-draining thread shuts down after receiving no input. The default is 5 minutes.BinaryLogQueueSink.pollTimeoutMs
: The frequency at which the queue-draining thread checks the idle time. The default is 1000 ms (1s).
Message pool
The LAS allocates an array of message pools with a range of sizes for processing incoming data.
Each message pool allocates messagePool.bucketSize
buffers. The pools contain buffers with sizes between messagePool.minBufferSize
and messagePool.maxBufferSize
, doubling in size from the minBufferSize
to maxBufferSize
. That is, the message pool at index i
will have buffers of messagePool.bucketSize * 2^i
. This message pool is lenient, meaning that if an additional buffer is needed, it will be temporarily allocated. If a buffer larger than the maximum pool size is needed, it will be temporarily allocated.
To analyze the output queue's performance, you first need to enable stats and collect data in the ProcessMetrics table. The relevant metric names are:
VariableMessagePool-logaggregator.OutstandingBuffers
: The number of message buffers currently in all output queues.VariableMessagePool-logaggregator.AllocatedBuffers
: The number of pool buffers allocated.VariableMessagePool-logaggregator.OverflowBuffers
: The number of buffers allocated after an initial pool was exhausted.VariableMessagePool-logaggregator.OversizedBuffers
: The number of buffers allocated that were larger than the maximum pool size.
If the OverflowBuffers
count is high, you might need to increase messagePool.bucketSize
. If the OversizedBuffers
count is high, you might need to increase messagePool.maxBufferSize
.
Some key points about the LAS:
-
Only one LAS will be selected for a given table location.
-
For user tables, data from all sources must use the same LAS instance to maintain ordering guarantees across all sources. In the default configuration, all user data is directed to the
rta
log aggregator instance. -
The LAS should be restarted weekly to ensure cached resources are released.