Logging System Tables

The system table logger is a utility in Deephaven Enterprise for logging ephemeral in-worker tables to system tables. This allows you to persist data from in-memory tables to disk, making it accessible for future queries and analysis. There are APIs in both Python and Java:

To add some context, Deephaven Enterprise namespaces are divided into two types:

  1. System namespaces, such as the DbInternal namespace, contain system tables and are managed by administrators.
  2. User namespaces contain user tables and are managed by unprivileged users.

The system table logger logs an in-worker table to a system table in a system namespace. These logs can then be passed to a Data Import Server (DIS) using a tailer to make table data accessible to other queries in real time with db.liveTable().

Log a table to a system table

Consider the following table:

myTimeTable = timeTable("PT2s").update(
    "IntCol = (int) i",
    "Doubles = new double[] {i % 10 == 0 ? null : i*1.1}",
    "Strings = new String[] {i % 10 == 0 ? null : (`` + (i % 101))}"
)
from deephaven import time_table

my_time_table = time_table("PT2s").update(
    [
        "IntCol = (int) i",
        "Doubles = new double[] {i % 10 == 0 ? null : i*1.1}",
        "Strings = new String[] {i % 10 == 0 ? null : (`` + (i % 101))}",
    ]
)

The table has the following schema:

Column nameData type
IntColint
Doublesdouble[]
StringsString[]

When logging any table to a system table, you need to specify codecs for columns with array data types. For this example, codecs are required for the Doubles and Strings columns. The same is true for custom complex data types in columns. For more on complex data types, see Complex data types.

There are two ways to log a table to a system table. Each is covered in a subsection below.

Log a table once

You can log a table to a system table once via a single operation. The following code block logs the table to a system table called Example.Data:

import io.deephaven.enterprise.database.SystemTableLogger
import io.deephaven.enterprise.codec.DoubleArrayCodec
import io.deephaven.enterprise.codec.StringArrayCodec

opts = SystemTableLogger.newOptionsBuilder()
            .currentDateColumnPartition(true)
            .putColumnCodecs("Doubles", new DoubleArrayCodec())
            .putColumnCodecs("Strings", new StringArrayCodec())
            .build()

// Log `Example.Data` to a system table once
SystemTableLogger.logTable(db, "Example", "Data", myTimeTable, opts)
from deephaven_enterprise import system_table_logger as stl

# Log `Example.Data` to a system table once
stl.log_table(
    namespace="Example",
    table_name="Data",
    table=my_time_table,
    columnPartition=None,  # Use current date as partition
    codecs={
        "Doubles": stl.double_array_codec(),
        "Strings": stl.string_array_codec(),
    },
)

This logs the table to the system table Example.Data as it exists at the moment the logging operation is called. The table becomes available like any other live system table. Future updates must be written to the system table with subsequent calls.

Log to a table incrementally

If instead you wish to log a table to a system table in a way that tracks changes as your table updates, you'll need to log the table incrementally.

Logging a table incrementally returns a reference that must be held in memory for as long as you wish to log the table. Once you are finished logging, you can close the reference. The following example incrementally logs the table to Example.Data and closes the reference after some time passes:

import io.deephaven.enterprise.database.SystemTableLogger
import io.deephaven.enterprise.codec.DoubleArrayCodec
import io.deephaven.enterprise.codec.StringArrayCodec

opts = SystemTableLogger.newOptionsBuilder()
            .currentDateColumnPartition(true)
            .putColumnCodecs("Doubles", new DoubleArrayCodec())
            .putColumnCodecs("Strings", new StringArrayCodec())
            .build()

// Log `Example.Data` to a system table incrementally. When finished logging, call `lh.close()`
lh=SystemTableLogger.logTableIncremental(db, "Example", "Data", tableToLog, opts)

// After some time passes...
lh.close()
from deephaven_enterprise import system_table_logger as stl

# Log `Example.Data` to a system table incrementally. When finished logging, call `lh.close()`
lh = stl.log_table_incremental(
    namespace="Example",
    table_name="Data",
    table=table_to_log,
    columnPartition=None,  # Use current date as partition
    codecs={
        "Doubles": stl.double_array_codec(),
        "Strings": stl.string_array_codec(),
    },
)

# After some time passes...
lh.close()

This logs the table to the system table Example.Data, as well as all subsequent row additions so long as the reference (lh) is kept open. The table becomes available like any other live system table.

Caution

Rows cannot be removed or modified in the logged table during incremental logging.

Configuration

Both logging operations support advanced logging configurations. In Groovy, this is done with the SystemTableLogger.Options builder. In Python, these configuration parameters are passed in as named arguments. The following list provides some common configuration parameters used when logging to system tables:

  • Column Partitioning: You must specify what column partition to write to. This can be a fixed column partition or the current date at the time the row was written.
  • Custom Codecs: Define encoding for complex data types. For more details, refer to the next section.
  • Log Directory: Define the storage location for binary logs.
  • Log Aggregator Service (LAS): By default, the SystemTableLogger uses the Log Aggregator Service (LAS). This sends table data over the network to a separate server that writes binary logs to disk. Alternatively, logs can be written directly to .bin files on the query server when LAS is not used.

Because no code generation or listener versioning is performed, you must write columns in the format the listener expects. Logging tables for a non-default ZoneId (timezone) may only be done via direct binary logging and cannot be done via LAS.

Complex data types

The SystemTableLogger supports logging complex types like arrays and custom objects using codecs. Deephaven provides several built-in codecs:

Data TypeCodec class
byte[]io.deephaven.enterprise.codec.ByteArrayCodec
char[]io.deephaven.enterprise.codec.CharArrayCodec
short[]io.deephaven.enterprise.codec.ShortArrayCodec
int[]io.deephaven.enterprise.codec.IntArrayCodec
long[]io.deephaven.enterprise.codec.LongArrayCodec
float[]io.deephaven.enterprise.codec.FloatArrayCodec
double[]io.deephaven.enterprise.codec.ByteArrayCodec
String[]io.deephaven.enterprise.codec.StringArrayCodec

You can implement your own codec for custom data types. To log custom data types, create a codec by extending the io.deephaven.util.codec.ObjectCodec class and add the compiled JAR to both the Core+ and Enterprise classpaths.

To use codecs, the schema for your table must define the codec used for each column. For example, the CustomObjects column in the example uses a custom codec called com.mycompany.CustomCodec.

Consider the following XML table schema for a system table called Example.Data. It contains an integer column, two array columns, and a custom object column, com.mycompany.Custom, which uses the codec com.mycompany.CustomCodec:

<Table name="Data" namespace="Example" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
  <Partitions keyFormula="${autobalance_by_first_grouping_column}" />

  <Column name="Date" dataType="String" columnType="Partitioning" />
  <Column name="IntCol" dataType="int"/>
  <Column name="Doubles" dataType="double[]" objectCodec="io.deephaven.enterprise.codec.DoubleArrayCodec"/>
  <Column name="Strings" dataType="String[]" objectCodec="io.deephaven.enterprise.codec.StringArrayCodec"/>
  <Column name="CustomObjects" dataType="com.mycompany.Custom" objectCodec="com.mycompany.CustomCodec"/>

  <Listener logFormat="1" listenerPackage="com.mycompany" listenerClass="ExampleDataFormat1Listener">

    <ListenerImports>
      import io.deephaven.enterprise.codec.DoubleArrayCodec;
      import io.deephaven.enterprise.codec.StringArrayCodec;
      import com.mycompany.CustomCodec;
    </ListenerImports>

    <Column name="Date" intradayType="none" />
    <Column name="IntCol"/>
    <Column name="Doubles" intradayType="Blob" dbSetter="DoubleArrayCodec.decodeStatic(Doubles)"/>
    <Column name="Strings" intradayType="Blob" dbSetter="StringArrayCodec.decodeStatic(Strings)"/>
    <Column name="CustomObjects"  intradayType="Blob" dbSetter="ThingCodec.decodeStatic(CustomObjects)" />

  </Listener>

</Table>

Note

Core+ workers do not yet support row-by-row logging. Existing binary loggers cannot be executed in the context of a Core+ worker because they reference classes that are shadowed (renamed). If row-level logging is required, use BinaryStoreWriterV2 directly.