Streaming binary logs

This section of the crash course introduces Deephaven's binary logs (often referred to as binlogs), which are integral to handling intraday data streams within the Deephaven ecosystem.

Binlogs are specialized files designed to manage real-time or near-real-time data, particularly ticking data — information that updates continuously throughout the day. These logs provide an efficient mechanism for ingesting and processing high-frequency data.

Written in a proprietary, row-oriented binary format, binlogs are optimized for Deephaven’s performance, enabling rapid data ingestion and processing. This makes them ideal for scenarios that require fast, large-volume data throughput.

Here’s how the data ingestion process works:

  1. An application receives data and writes it to binlogs.
  2. A process called the tailer reads these files and sends them to the Data Import Server, where they are stored persistently within the Deephaven installation.
  3. From there, they are available to the system's users through the IDE and through clients such as the Python client.

How intraday data is partitioned

Intraday data is systematically partitioned to optimize storage and retrieval efficiency. Deephaven's partitioning strategy involves three key components:

  • Column partition
    • One column is chosen to partition the data on disk, determining the highest level of data segmentation. This is known as the column partition.
    • Typically, the column partition is a Date column, as dates naturally distinguish intraday data from historical data.
  • Internal partition
    • The internal partition is simply used to separate data sources and is not crucial to this discussion.

Overview

This guide provides an end-to-end binary-logging example using a simple Java application. The basic process consists of these steps:

  1. Define and deploy the schemas.
  2. Generate the Deephaven logger classes from these schemas.
  3. Create the logger application to log some data for these tables.
  4. Compile the logger application.
  5. Run the logger application.
  6. Query the tables to see the logged data.
  7. Merge the data into historical.
  8. Validate the data and remove it from intraday.

The steps in this guide are performed on a Deephaven server. If your installation has multiple nodes, use the infrastructure node. The infrastructure node is the one that runs a configuration server.

Note

To follow along, you'll need an account with access to the Deephaven installation:

ssh -i <path-to-your-ssh-key> <username>@<your-deephaven-server-url>

The steps below require access to a Deephaven administrative user.

Define and deploy the schemas

When binary logs are used for data ingestion, the first step is to define a schema. This example provides two simple schemas.

The date-based schema uses dynamic date partitioning to determine the column partition value for each logged row.

Date-based schema
<Table namespace="ExampleNamespace" name="LoggingDate" storageType="NestedPartitionedOnDisk">
    <Partitions keyFormula="${autobalance_single}"/>
    <Column name="Date" dataType="String" columnType="Partitioning"/>
    <Column name="Timestamp" dataType="DateTime"/>
    <Column name="SomeData" dataType="String"/>

    <LoggerListener logFormat="1"
                    loggerPackage="io.deephaven.crashcourse.dynamic.gen" loggerClass="LoggingDateFormat1Logger"
                    listenerPackage="io.deephaven.crashcourse.dynamic.gen" listenerClass="LoggingDateFormat1Listener"
                    rethrowLoggerExceptionsAsIOExceptions="false">
        <SystemInput name="timestamp" type="long"/>
        <SystemInput name="someData" type="String"/>

        <Column name="Date"/>
        <Column name="Timestamp" intradaySetter="timestamp" datePartitionInput="millis"/>
        <Column name="SomeData" intradaySetter="someData"/>
    </LoggerListener>

    <Validator>
        <assertSize min="1" max="999999999" />
    </Validator>

</Table>
  • namespace="ExampleNamespace" and name="LoggingDate" define the table's namespace and table name.
  • <Column name="Date" dataType="String" columnType="Partitioning"/> sets the column called Date as the partitioning column. This is the column used to determine the column partition value for each row.
  • The generated logger's package is defined by the loggerPackage="io.deephaven.crashcourse.dynamic.gen" definition, and the class name by the loggerClass="LoggingDateFormat1Logger definition.
  • The Validator section is explained in the later validation section.

The function-based schema uses a function to determine the column partition value for each logged row.

Function-based schema
<Table namespace="ExampleNamespace" name="LoggingFunction" storageType="NestedPartitionedOnDisk">
    <Partitions keyFormula="${autobalance_single}"/>
    <Column name="MyPartition" dataType="String" columnType="Partitioning"/>
    <Column name="Timestamp" dataType="DateTime"/>
    <Column name="SomeData" dataType="String"/>

    <LoggerListener logFormat="1"
                    loggerPackage="io.deephaven.crashcourse.dynamic.gen" loggerClass="LoggingFunctionFormat1Logger"
                    listenerPackage="io.deephaven.crashcourse.dynamic.gen" listenerClass="LoggingFunctionFormat1Listener"
                    rethrowLoggerExceptionsAsIOExceptions="false">
        <SystemInput name="timestamp" type="long"/>
        <SystemInput name="someData" type="String"/>

        <Column name="MyPartition"/>
        <Column name="Timestamp" intradaySetter="timestamp"/>
        <Column name="SomeData" intradaySetter="someData" functionPartitionInput="true"/>
    </LoggerListener>
</Table>
  • namespace="ExampleNamespace" and name="LoggingFunction" define the table's namespace and table name.
  • <Column name="MyPartition" dataType="String" columnType="Partitioning"/> sets the column called MyPartition as the column partitioning column. This is the column used to determine the column partition value for each row. This is an advanced use case, as this table isn't partitioned by a Date - there's not even a column that reflects a date.
  • The generated logger's package is defined by the loggerPackage="io.deephaven.crashcourse.dynamic.gen" definition, and the class name by the loggerClass="LoggingDateFormat1Logger definition.

First, create schema files on disk.

Note

While there are tools that support schema discovery from existing data sources, they are beyond the scope of this example.

The example below creates simple schemas and a temporary directory to hold the schema files.

mkdir -p /tmp/schema/ExampleNamespace

Copy the date-based schema to the server or paste it into a file there.

vi /tmp/schema/ExampleNamespace/ExampleNamespace.LoggingDate.schema

Copy the function-based schema to the server or paste it into a file there.

vi /tmp/schema/ExampleNamespace/ExampleNamespace.LoggingFunction.schema

Deploy the schemas, making them available to the Deephaven system. This requires access to an operating system account with Deephaven administrative privileges, by default irisadmin.

sudo -u irisadmin /usr/illumon/latest/bin/dhconfig schema import --force --directory /tmp/schema/ExampleNamespace

Generate the logger classes

Next, use the Deephaven generate_loggers script to generate the Deephaven logger classes from the schemas. The logger classes are used by the application to log data to binary log files, while the listener classes will be generated as needed (not with this command) by the Deephaven Data Import Server to load the data.

First, create a local directory for use by the Deephaven code-generation scripts.

export WORKSPACE=${HOME}/workspace
mkdir -p ${WORKSPACE}

Then, call the script that generates and compiles the Deephaven logger classes.

/usr/illumon/latest/bin/generate_loggers \
  -d /usr/illumon/latest/ \
  -f iris-common.prop \
  -j -Dlogroot=${WORKSPACE}/logs \
  packages=io.deephaven.crashcourse.dynamic.gen \
  outputJar=exampleGeneratedLoggers.jar

The command includes several parameters.

  • -d /usr/illumon/latest/ tells the scripts where to find the Deephaven installation.
  • -f iris-common.prop indicates which property file to use (this is the default file).
  • -j -Dlogroot=${WORKSPACE}/logs specifies a directory in which to write logs.
  • packages=io.deephaven.crashcourse.dynamic.gen restricts the logger class generation to just the package named in the schema, ensuring it doesn't generate other loggers. Use unique names for different applications to provide an easy way to only regenerate what's needed.
  • outputJar=exampleGeneratedJar.jar ensures that the generated JAR file has a unique name to distinguish it from other generated JAR files.

The generated classes are visible in the generated JAR file.

jar tf ${WORKSPACE}/exampleGeneratedLoggers.jar

Copy the generated JAR file to a location where it is available to the Deephaven application. This requires access to an operating system account with Deephaven administrative privileges such as irisadmin.

cp ${WORKSPACE}/exampleGeneratedLoggers.jar /tmp
sudo -u irisadmin cp /tmp/exampleGeneratedLoggers.jar /etc/sysconfig/illumon.d/java_lib

For application development, the JAR can be copied to a development environment such as an IDE (the application also needs access to other Deephaven JARs). The generation script also creates $WORKSPACE/IllumonCustomerGeneratedCodeSources.jar with the generated source code.

Create the logger application

Create a Java file with the application. This example simply creates one on the Deephaven server, but in a real application, this will likely be developed in an IDE using the classes generated earlier.

LoggingExample.java
package io.deephaven.crashcourse.dynamic;

import com.fishlib.configuration.Configuration;
import com.fishlib.io.log.LogLevel;
import com.fishlib.io.logger.Logger;
import com.fishlib.io.logger.StreamLoggerImpl;
import com.illumon.iris.db.util.logging.EventLoggerFactory;
import io.deephaven.crashcourse.dynamic.gen.LoggingDateFormat1Logger;
import io.deephaven.crashcourse.dynamic.gen.LoggingFunctionFormat1Logger;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.function.Function;

/**
 * A simple class to illustrate dynamic logging from pre-defined schemas.
 */
public class LoggingExample {

  /** The logger that illustrates dynamic logging using dynamic date-partition calculation */
  private final LoggingDateFormat1Logger dateLogger;

  /** The logger that illustrates dynamic logging using a function */
  private final LoggingFunctionFormat1Logger functionLogger;

  /** The logger that illustrates a fixed partition */
  private final LoggingDateFormat1Logger fixedPartitionLogger;

  private LoggingExample() {
    // A Deephaven Logger instance
    final Logger log = new StreamLoggerImpl(System.out, LogLevel.INFO);

    // A Configuration instance is required by the factories that create the binary loggers
    final Configuration configuration = Configuration.getInstance();

    // Timezone for the date-determination logger
    final String timeZone = "America/New_York";

    // This logger will automatically determine the column partition value based on the logged timestamp,
    // using the specified time zone.
    dateLogger = EventLoggerFactory.createIntradayLogger(configuration,
            "LoggingExample",
            log,
            LoggingDateFormat1Logger.class,
            timeZone);

    // This logger requires a lambda to generate the column partition value. Because of the schema definition,
    // the function will be passed the value of the row's SomeData column.
    functionLogger = EventLoggerFactory.createIntradayLogger(configuration,
            "LoggingExample",
            log,
            LoggingFunctionFormat1Logger.class,
            null,
            (Function<String, String>) (value) -> {
              final int dataLen = value != null ? value.length() : 0;
              if (dataLen == 0){
                return "Unknown";
              }
              return value.substring(0, Math.min(10, dataLen));
            });

    // This logger will always log to the column partition value 'columnPartition'
    fixedPartitionLogger = EventLoggerFactory.createIntradayLogger(configuration,
            "LoggingExample",
            log,
            LoggingDateFormat1Logger.class,
            null,
            "2020-01-31");
  }

  private void logDateEntry(final long timestamp, final String dataToLog) throws IOException {
    dateLogger.log(timestamp, dataToLog);
  }

  private void logFunctionEntry(final long timestamp, final String dataToLog) throws IOException

  {
    functionLogger.log(timestamp, dataToLog);
  }

  private void logFixedPartitionEntry(final long timestamp, final String dataToLog) throws IOException {
    fixedPartitionLogger.log(timestamp, dataToLog);
  }

  private void run() throws IOException {
    // Using the date-partitioning logger, log a few entries in two different partitions, logging into two different
    // partitions (dates) to illustrate the idea of a date rollover
    final ZonedDateTime today = ZonedDateTime.now();
    final ZonedDateTime yesterday = today.minusDays(1);
    logDateEntry(yesterday.toEpochSecond() * 1_000, "Some data row 1");
    logDateEntry(yesterday.toEpochSecond() * 1_000 + 100, "Some data row 2");
    logDateEntry(today.toEpochSecond() * 1_000, "Some data row 3");
    logDateEntry(today.toEpochSecond() * 1_000 + 100, "Some data row 4");

    // The function-partition logger will use the first 10 characters of the logged data to determine the column partition value
    logFunctionEntry(System.currentTimeMillis(), "TrialPart1");
    logFunctionEntry(System.currentTimeMillis() + 1000, "TrialPart1 with data");
    logFunctionEntry(System.currentTimeMillis() + 2000, "TrialPart2 with more data");

    // Log a couple of entries which go in a fixed partition
    logFixedPartitionEntry(System.currentTimeMillis(), "FixedEntry1");
    logFixedPartitionEntry(System.currentTimeMillis(), "FixedEntry2");

    // Ensure everything is flushed before shutting down
    dateLogger.shutdown();
    functionLogger.shutdown();
    fixedPartitionLogger.shutdown();
  }

  public static void main(final String... args) throws IOException {
    final LoggingExample loggingExample = new LoggingExample();
    loggingExample.run();
  }
}

The EventLoggerFactory class creates the intraday logger instances. This factory returns a fully initialized logger ready to write data. Based on properties, it can be set up to either write binary log files or send data to the Log Aggregator Service. This example writes log files directly.

  • One dynamic logger is created for each table defined with the schemas, to illustrate using data to determine the internal partition value.
  • When creating the dateLogger (for rows for the LoggingDate table with column partition values based on the timestamp field), a timestamp is taken from the current date and time. Offsets are used to generate timestamps, and records are written for yesterday and today to simulate a date rollover indicated within the data's Timestamp field. In an application receiving data from a remote source, the timestamps are likely contained within the data.
  • When creating the functionLogger (for the LoggingFunction table), a lambda is passed in as the final parameter of the createIntradayLogger call. This example uses the first 10 characters of the SomeData column to determine the column partition value. Since a String can be null or empty, it uses a default value to ensure that an exception is not raised in this case.
  • A third fixedPartitionLogger logger is created to write data to a single column partition value, "2020-01-31", to the same LoggingDate table as the date logger.

Note

This example uses hard-coded values, but a real application would generate them from its data source.

Save the LoggingExample class in java/io/deephaven/crashcourse/dynamic/LoggingExample.java:

mkdir -p java/io/deephaven/crashcourse/dynamic
vi java/io/deephaven/crashcourse/dynamic/LoggingExample.java

You may need to type the following command before pasting to avoid formatting issues:

:set paste

Compile the logger application

Compile the Java code, creating a JAR file that can be run.

Note

A real application would be developed in an IDE and use something like Gradle to manage dependencies, but for this example, simply compiling the Java is sufficient.

Create the compilation script which compiles the Java file and puts the resulting compiled code into myClass.jar. Copy compile.bash to the server or paste it into a file there.

compile.bash
#!/bin/bash

mkdir -p classfiles

# Set up the Deephaven environment required to compile
export DEVROOT=/usr/illumon/latest
proc="iris_exec"
source /etc/sysconfig/illumon
source /usr/illumon/latest/bin/launch_functions
setup_run_environment
export CLASSPATH

# Compile the Java file into a classfile
javac -parameters -d ./classfiles java/io/deephaven/crashcourse/dynamic/LoggingExample.java

# Create the jar from the compiled classfile
jar cvf loggingExample.jar -C ./classfiles /io/deephaven/crashcourse/dynamic/LoggingExample.class
vi compile.bash

Then run it.

bash compile.bash

Copy the application JAR file to a location where it is available to the Deephaven application. This requires access to an operating system account with Deephaven administrative privileges such as irisadmin.

cp loggingExample.jar /tmp
sudo -u irisadmin cp /tmp/loggingExample.jar /etc/sysconfig/illumon.d/java_lib

Run the logger application

Since the application uses the default location to write the binary log files, access to an operating system account with Deephaven administrative privileges, such as irisadmin, is required.

Use the iris_exec script to run the program.

  • It will set up the classpath so that the application can access any required Deephaven code.
  • The first parameter io.deephaven.crashcourse.dynamic.LoggingExample specifies the class that is run.
  • The extra parameter -DLoggingExample.useDynamicPartitions=true tells the application to use the dynamic (row-based) partitioning. The -j tells the script that it is a Java parameter to be passed through to the JVM.
sudo -u irisadmin /usr/illumon/latest/bin/iris_exec \
  io.deephaven.crashcourse.dynamic.LoggingExample \
  -j -DLoggingExample.useDynamicPartitions=true

You can see the generated binary log files for the two tables.

ls -ltr /var/log/deephaven/binlogs/ExampleNamespace*.bin.*

The location into which these logs are generated can be changed by updating the log-related properties as defined in the Deephaven Operations Guide. Applications can be run under non-Deephaven accounts with additional tailer configuration to find them.

The following commands delete the data and start over.

Warning

The commands below perform a complete restart of the Deephaven application. They should not be used on a production system.

/usr/illumon/latest/bin/dh_monit down --block
sudo -u dbmerge rm -r /db/Intraday/ExampleNamespace
sudo -u irisadmin rm -r /var/log/deephaven/binlogs/ExampleNamespace*.bin.*

/usr/illumon/latest/bin/dh_monit up --block

Query the data

The data logged by the application should now be available in Deephaven intraday. In the web IDE, create a new Core+ Code Studio.

code-studio-1.png

The following statements query the two partitions of data we created with the date-based logger and the partition created by the fixed-partition logger.

dateTableYesterday = db.liveTable("ExampleNamespace", "LoggingDate").where("Date=pastDate(1).toString()")
dateTableToday = db.liveTable("ExampleNamespace", "LoggingDate").where("Date=today()")
dateTableFixed = db.liveTable("ExampleNamespace", "LoggingDate").where("Date=`2020-01-31`")
dateTableYesterday = db.live_table("ExampleNamespace", "LoggingDate").where(
    "Date=pastDate(1).toString()"
)
dateTableToday = db.live_table("ExampleNamespace", "LoggingDate").where("Date=today()")
dateTableFixed = db.live_table("ExampleNamespace", "LoggingDate").where(
    "Date=`2020-01-31`"
)

The following statements query the two partitions of data we created with the function-based logger by querying the table for those specific partitions. These partitions were dynamically determined by the first ten characters of the data, so that is used in the .where clause.

functionTablePartition1 = db.liveTable("ExampleNamespace", "LoggingFunction").where("MyPartition=`TrialPart1`")
functionTablePartition2 = db.liveTable("ExampleNamespace", "LoggingFunction").where("MyPartition=`TrialPart2`")
functionTablePartition1 = db.live_table("ExampleNamespace", "LoggingFunction").where(
    "MyPartition=`TrialPart1`"
)
functionTablePartition2 = db.live_table("ExampleNamespace", "LoggingFunction").where(
    "MyPartition=`TrialPart2`"
)

At this point, you've successfully created an application that writes Deephaven binary logs, ingested them into the system, and queried the results. The rest of this guide provides optional follow-on data workflow steps.

Merge the data

Data from binary logs is always ingested as Intraday data. This isn't intended for long-term storage, so each day's data is generally merged into historical data. A typical workflow performs this merge every day, often through a Data Merge Persistent Query. This is done after the day's data is complete, either overnight or after the close of the business day.

Set up historical data storage

Before merging intraday data to historical, the disk system must be set up. Consult your administrator for permissions to do this. Here is an example bash script to create historical partitions for ExampleNamespace. This is a one-time task and further details are found in the NFS partition methodologies appendix of the architecture guide.

sudo su dbmerge

namespace="ExampleNamespace"

cd /db/Systems
mkdir ${namespace}
chmod 755 ${namespace}
cd ${namespace}

mkdir Partitions
mkdir WritablePartitions
chown dbmerge:dbmergegrp Partitions
chown dbmerge:dbmergegrp WritablePartitions
chmod 755 Partitions
chmod 755 WritablePartitions

mkdir Partitions/0
chmod 755 Partitions/0
ln -s /db/Systems/${namespace}/Partitions/0  /db/Systems/${namespace}/WritablePartitions/0

exit

Create a Data Merge Persistent Query

To create a Data Merge Persistent Query, you'll need to be logged in to the Deephaven web interface as a data administrator. This example only shows the steps for the LoggingDate table.

The Query Monitor is used to create a Data Merge Persistent Query. Navigate to the Query Monitor tab and click the New button:

  • On the Settings tab, give the Persistent Query a reasonable name
  • Choose the Data Merge type.
  • Select AutoMerge for the DB server. This tells the server to choose any available merge server (there is probably only one).
  • Assign it 1 GB of memory.

merge-1-config.png

For data that is to be retained, a merge query should be set up to run every night after intraday data ingestion has completed. For this example, we'll assume the merge should be performed after midnight, merging yesterday's data. Select the Scheduling tab and update the scheduling appropriately - this example runs at 2 AM New York time every night.

  • The Daily Schedule Type should already be selected.
  • Ensure that all the days are selected under Daily Options.
  • Change the Start Time to 02:00:00.
  • Set the Max Execution Time to 5 minutes. If the query takes longer than 5 minutes it will fail, but this example should take a few seconds.

merge-2-scheduling.png

On the Merge Settings tab, choose the namespace and table name.

  • The Partition Value Formula specifies the partition to be merged, and uses the Legacy engine's format. Yesterday can be written as com.illumon.util.calendar.Calendars.calendar("USNYSE").previousDay(1).
  • The Sort Column Formula ensures that the merged data is sorted by the values in the specified columns - Timestamp ensures that the data is sorted in time order.

merge-3-panel.png

Click Save.

Since the Persistent Query won't run until the early morning, we will start it manually now to see its effects. Select your query name in the Query Monitor and click the Start button.

After it completes, yesterday's data can now be viewed as historical data:

dateTableHistorical = db.historicalTable("ExampleNamespace", "LoggingDate").where("Date=pastDate(1).toString()")
dateTableHistorical = db.historical_table("ExampleNamespace", "LoggingDate").where(
    "Date=pastDate(1).toString()"
)

Validate and delete intraday

At this point, yesterday's data is both intraday and historical. A typical workflow then validates that the data has been written to historical and deletes it from intraday, often through the use of a Data Validation Persistent Query which runs immediately following the Data Merge persistent query. You'll need to be logged in to the Deephaven web interface as a data administrator to perform these tasks.

Click the New button.

  • On the Settings tab, give the Persistent Query a reasonable name
  • Choose the Data Validation type.
  • Select AutoMerge for the DB server. This tells the server to choose any available merge server (there is probably only one).
  • Assign it 1 GB of memory.

validate-1-config1.png

On the Scheduling tab, configure the query so that it automatically starts after the merge query completes.

  • Choose the Dependent Schedule Type.
  • Select the dependency as the merge query created earlier.
  • Set the Max Execution Time to 5 minutes.

validate-2-scheduling.png

Update the Validate Settings tab.

  • The Namespace and Table fields configure the table being merged, which should be set to LoggingDate.
  • The Partition Value Formula specifies the partition to be merged, and uses the Legacy engine's format. Yesterday can be written as com.illumon.util.calendar.Calendars.calendar("USNYSE").previousDay(1).
  • For the Validator Classes, click the Use Schema Validation button. This tells the validator to perform validation based on the <Validator> section in the schema we defined earlier:
    • <assertSize min="1" max="999999999" /> indicates that there should be at least one value in the historical table, and no more than 999999999. Many different data validation options are available.
  • Set Test Type to Full (Historical) so that the validation runs against the merged (historical) data.
  • Select the Delete intraday data checkbox. This causes a successful validation to delete the intraday data.

validate-3-panel.png

To simulate a real environment, delete the associated binary logs so they don't get re-ingested after:

sudo -u irisadmin rm -r /var/log/deephaven/binlogs/ExampleNamespace.LoggingDate*.bin.*

In normal operations, this will run every time the merge query completes successfully, but it can be started manually by selecting it in the Query Monitor and clicking the Start button.

After it runs, the data will no longer be in yesterday's intraday table, but only in historical. Since data is cached, you'll need to restart your Code Studios (you may see an exception in the Code Studio looking at the intraday data as it's deleted).

dateTableYesterday = db.liveTable("ExampleNamespace", "LoggingDate").where("Date=pastDate(1).toString()")
dateTableHistorical = db.historicalTable("ExampleNamespace", "LoggingDate").where("Date=pastDate(1).toString()")
dateTableYesterday = db.live_table("ExampleNamespace", "LoggingDate").where(
    "Date=pastDate(1).toString()"
)
dateTableHistorical = db.historical_table("ExampleNamespace", "LoggingDate").where(
    "Date=pastDate(1).toString()"
)