How to log to a table from a Persistent Query
The guide shows you how to log to a table from a persistent query. In the example below, the query will be logging to the MarketData.Trades
table. The schema is:
<Table name="Trades" namespace="MarketData" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
<Partitions keyFormula="${autobalance_by_first_grouping_column}" />
<Column name="Date" dataType="String" columnType="Partitioning" />
<Column name="Timestamp" dataType="DateTime" />
<Column name="symbol" dataType="String" />
<Column name="price" dataType="Double" />
<Column name="size" dataType="Integer" />
<LoggerListener logFormat="0" loggerClass="TradesLogger" loggerPackage="com.illumon.iris.intraday.gen.marketdata" rethrowLoggerExceptionsAsIOExceptions="false" tableLogger="true" generateLogCalls="true" verifyChecksum="true" listenerClass="TradesListener" listenerPackage="com.illumon.iris.intraday.gen.marketdata">
<SystemInput name="Timestamp" type="com.illumon.iris.db.tables.utils.DBDateTime" />
<SystemInput name="symbol" type="java.lang.String" />
<SystemInput name="price" type="double" />
<SystemInput name="size" type="int" />
<Column name="Timestamp" intradayType="Long" timePrecision="Nanos" />
<Column name="symbol" />
<Column name="price" />
<Column name="size" />
</LoggerListener>
</Table>
This schema defines the generated logger class TradesLogger
. It may be helpful to have the Java file available for reference. Information on generating loggers and finding the generated Java files can be found in the Streaming Data guide.
First, create the binary logger instance. The constructor takes no arguments. Then, initialize the logger. The logger must be initialized in order to log data. See Java Loggers for more information.
import com.illumon.iris.intraday.gen.marketdata.TradesLogger
import com.illumon.intradaylogger.LongLivedProcessBinaryStoreWriterFactory
internalPartition = "vm1"
columnPartition = currentDateNy()
filePath = "/var/log/deephaven/binlogs/MarketData.Trades.System.${internalPartition}.${columnPartition}.bin"
MarketDataTradesLogger = new TradesLogger()
MarketDataTradesLogger.init(new LongLivedProcessBinaryStoreWriterFactory(filePath, log), 10000)
The LongLivedProcessBinaryStoreWriterFactory
will create filenames that automatically roll over from hour to hour, and from day to day. Here, the bin logs will be written to the directory /var/log/deephaven/binlogs/
. The queue size of the logger will be 10000. Log files will be automatically picked up by the main tailer process.
Logging data from a table
Deephaven tables can can be logged directly using the TableLoggerUtil.logTable
method.
public static void logTable(final TableLogger tableLogger, final Table tableToLog, final Index index,
final TableLoggerBase.Flags flags, final boolean logUpdates, final boolean logPrevValues,
final Collection<String> prevColumnsToLog) throws IOException
The arguments of logTable
are:
Argument | Description |
---|---|
TableLogger tableLogger | The destination TableLogger . |
Table tableToLog | The table to log. |
Index index | The rows that should initially be logged. The rows are represented in row key space, not position space. If you specify rows that are not part of the table, the results are undefined. |
TableLoggerBase.Flags flags | Atomic : the initial state and each update should be logged as a transaction. This prevents readers from consuming partial updates. RowByRow : each row is logged independently. |
boolean logUpdates | If true , Listen to the table and log each update. |
boolean logPrevValues | Determines if previous modified values be logged. |
Collection<String> prevColumnsToLog | The columns to log for previous modifications and removals (null for all columns). |
The flags
argument specifies how to log the tableToLog
- in one atomic operation, row by row, or as a transaction:
TableLoggerBase.Flags Type | Description |
---|---|
None | No transaction begin/end. |
Atomic | Log the entire table as a unit. |
RowByRow | Log each row independently. |
Start | Begin a transaction. |
End | End a previously started transaction. |
Suppose the data we want to log to MarketData.Trades
is inside the table TradesFinal
.
import com.illumon.iris.db.tables.dataimport.TableLoggerUtil
import com.illumon.iris.db.tables.dataimport.TableLoggerBase
TradesFinal = emptyTable(10).update("Date = currentDateNy()", "Timestamp = DBDateTime.now()", "symbol = `AAPL`", "price = Math.random()", "size = 1100")
TableLoggerUtil.logTable(MarketDataTradesLogger, TradesFinal, TradesFinal.getIndex(),
TableLoggerBase.Flags.RowByRow, false, false, null)
This tells the MarketDataTradesLogger
to log the TradesFinal
table row by row, ignoring any updates to the TradesFinal
table.
Logging other data
If data is being generated outside a Deephaven table, the Logger's log
method can be used to write out the binary data. The parameters to the log method are determined by the SystemInput
XML elements of the Logger's schema, in this example:
void log(Row.Flags flags, com.illumon.iris.db.tables.utils.DBDateTime Timestamp,java.lang.String symbol,double price,int size);
The flags
argument specifies how to log the data; as a single row, or as part of a transaction.
Row.Flags Type | Description |
---|---|
None | This row does not start or stop a transaction. |
SingleRow | This row is the only row in a transaction. |
StartTransaction | This row is the first row in a transaction. |
EndTransaction | This row is the last row in a transaction. |
The following example is logging single rows.
/*
* This example is writing out a randomly generated row every five seconds.
*/
import com.illumon.iris.binarystore.Row
rand = new Random()
while(true) {
price = 100 + (rand.nextDouble() * 2_000)
size = rand.nextInt(50) + 1
MarketDataTradesLogger.log(
Row.Flags.SingleRow,
DBDateTime.now(),
"AAPL",
price,
size
)
Thread.sleep(5000)
}