How to log to a table from a Persistent Query

The guide shows you how to log to a table from a persistent query. In the example below, the query will be logging to the MarketData.Trades table. The schema is:

<Table name="Trades" namespace="MarketData" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
    <Partitions keyFormula="${autobalance_by_first_grouping_column}" />

    <Column name="Date" dataType="String" columnType="Partitioning" />
    <Column name="Timestamp" dataType="DateTime" />
    <Column name="symbol" dataType="String" />
    <Column name="price" dataType="Double" />
    <Column name="size" dataType="Integer" />

    <LoggerListener logFormat="0" loggerClass="TradesLogger" loggerPackage="com.illumon.iris.intraday.gen.marketdata" rethrowLoggerExceptionsAsIOExceptions="false" tableLogger="true" generateLogCalls="true" verifyChecksum="true" listenerClass="TradesListener" listenerPackage="com.illumon.iris.intraday.gen.marketdata">

        <SystemInput name="Timestamp" type="com.illumon.iris.db.tables.utils.DBDateTime" />
        <SystemInput name="symbol" type="java.lang.String" />
        <SystemInput name="price" type="double" />
        <SystemInput name="size" type="int" />

        <Column name="Timestamp" intradayType="Long" timePrecision="Nanos" />
        <Column name="symbol" />
        <Column name="price" />
        <Column name="size" />
    </LoggerListener>
</Table>

This schema defines the generated logger class TradesLogger. It may be helpful to have the Java file available for reference. Information on generating loggers and finding the generated Java files can be found in the Streaming Data guide.

First, create the binary logger instance. The constructor takes no arguments. Then, initialize the logger. The logger must be initialized in order to log data. See Java Loggers for more information.

import com.illumon.iris.intraday.gen.marketdata.TradesLogger
import com.illumon.intradaylogger.LongLivedProcessBinaryStoreWriterFactory

internalPartition = "vm1"
columnPartition = currentDateNy()
filePath = "/var/log/deephaven/binlogs/MarketData.Trades.System.${internalPartition}.${columnPartition}.bin"

MarketDataTradesLogger = new TradesLogger()
MarketDataTradesLogger.init(new LongLivedProcessBinaryStoreWriterFactory(filePath, log), 10000)

The LongLivedProcessBinaryStoreWriterFactory will create filenames that automatically roll over from hour to hour, and from day to day. Here, the bin logs will be written to the directory /var/log/deephaven/binlogs/. The queue size of the logger will be 10000. Log files will be automatically picked up by the main tailer process.

Logging data from a table

Deephaven tables can can be logged directly using the TableLoggerUtil.logTable method.

public static void logTable(final TableLogger tableLogger, final Table tableToLog, final Index index,
                                final TableLoggerBase.Flags flags, final boolean logUpdates, final boolean logPrevValues,
                                final Collection<String> prevColumnsToLog) throws IOException

The arguments of logTable are:

ArgumentDescription
TableLogger tableLoggerThe destination TableLogger.
Table tableToLogThe table to log.
Index indexThe rows that should initially be logged. The rows are represented in row key space, not position space. If you specify rows that are not part of the table, the results are undefined.
TableLoggerBase.Flags flagsAtomic: the initial state and each update should be logged as a transaction. This prevents readers from consuming partial updates. RowByRow: each row is logged independently.
boolean logUpdatesIf true, Listen to the table and log each update.
boolean logPrevValuesDetermines if previous modified values be logged.
Collection<String> prevColumnsToLogThe columns to log for previous modifications and removals (null for all columns).

The flags argument specifies how to log the tableToLog - in one atomic operation, row by row, or as a transaction:

TableLoggerBase.Flags TypeDescription
NoneNo transaction begin/end.
AtomicLog the entire table as a unit.
RowByRowLog each row independently.
StartBegin a transaction.
EndEnd a previously started transaction.

Suppose the data we want to log to MarketData.Trades is inside the table TradesFinal.

import com.illumon.iris.db.tables.dataimport.TableLoggerUtil
import com.illumon.iris.db.tables.dataimport.TableLoggerBase

TradesFinal = emptyTable(10).update("Date = currentDateNy()", "Timestamp = DBDateTime.now()", "symbol = `AAPL`", "price = Math.random()", "size = 1100")

TableLoggerUtil.logTable(MarketDataTradesLogger, TradesFinal, TradesFinal.getIndex(),
        TableLoggerBase.Flags.RowByRow, false, false, null)

This tells the MarketDataTradesLogger to log the TradesFinal table row by row, ignoring any updates to the TradesFinal table.

Logging other data

If data is being generated outside a Deephaven table, the Logger's log method can be used to write out the binary data. The parameters to the log method are determined by the SystemInput XML elements of the Logger's schema, in this example:

void log(Row.Flags flags, com.illumon.iris.db.tables.utils.DBDateTime Timestamp,java.lang.String symbol,double price,int size);

The flags argument specifies how to log the data; as a single row, or as part of a transaction.

Row.Flags TypeDescription
NoneThis row does not start or stop a transaction.
SingleRowThis row is the only row in a transaction.
StartTransactionThis row is the first row in a transaction.
EndTransactionThis row is the last row in a transaction.

The following example is logging single rows.

/*
* This example is writing out a randomly generated row every five seconds.
*/
import com.illumon.iris.binarystore.Row

rand = new Random()

while(true) {
    price = 100 + (rand.nextDouble() * 2_000)
    size = rand.nextInt(50) + 1

    MarketDataTradesLogger.log(
     Row.Flags.SingleRow,
     DBDateTime.now(),
     "AAPL",
     price,
     size
    )

    Thread.sleep(5000)
}