How to log to a table from a Persistent Query
This guide shows how to log to a table from a query. In the examples below, the queries will be logging to the MarketData.Trades
table. The schema is:
<Table name="Trades" namespace="MarketData" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
<Partitions keyFormula="${autobalance_by_first_grouping_column}" />
<Column name="Date" dataType="String" columnType="Partitioning" />
<Column name="Timestamp" dataType="DateTime" />
<Column name="Symbol" dataType="String" />
<Column name="Price" dataType="Double" />
<Column name="Size" dataType="Integer" />
<!-- Note that there is no requirement for a generated Logger -->
<Listener listenerClass="TradesListener" listenerPackage="io.deephaven.intraday.gen.marketdata">
<SystemInput name="Timestamp" type="java.time.Instant" />
<SystemInput name="Symbol" type="java.lang.String" />
<SystemInput name="Price" type="double" />
<SystemInput name="Size" type="int" />
<Column name="Timestamp" intradayType="Long" timePrecision="Nanos" />
<Column name="Symbol" />
<Column name="Price" />
<Column name="Size" />
</Listener>
</Table>
A Deephaven Table
with an equivalent TableDefinition
can be appended to a System table using the SystemTableLogger
. Rows can be appended automatically from an updating Table
with SystemTableLogger.logTableIncremental(...)
or on-demand with SystemTableLogger.logTable(...)
, both shown below. Both logging operations support advanced logging configurations. In Groovy, this is done with the SystemTableLogger.Options
builder. In Python, these configuration parameters are passed in as named arguments.
Note
Unlike the old Legacy Logger, there is no direct Logger.log(Row.Flags, ...)
method provided to allow logging of individual values to a row. This functionality can be simulated by creating a new temporary Table
with the appropriate values in a single row and logging the table with SystemTableLogger.logTable(...)
.
Logging data from a static table
A Table
can be written using SystemTableLogger.logTable(...)
. This may be called as many times as desired. Be aware that each call to SystemTableLogger.logTable(...)
will log the entire Table
that is passed in, so care must be taken to avoid duplication.
import io.deephaven.enterprise.database.SystemTableLogger
// A static table with 10 rows
TradesStatic = emptyTable(10).update(
"Timestamp = now()",
"Symbol = `HDSN`",
"Price = 10.0 * Math.random()",
"Size = (int) Math.round(1000.0 * Math.random())"
)
// Use today's date for the Column Partition, and use the value "static" for the Internal Partition
opts = SystemTableLogger.newOptionsBuilder()
.currentDateColumnPartition(true)
.internalPartition("static")
.build()
// Log `MarketData.Trades` to a system table once. This will add the 10 rows from `TradesStatic`
SystemTableLogger.logTable(db, "MarketData", "Trades", TradesStatic, opts)
from deephaven import empty_table
from deephaven_enterprise import system_table_logger as stl
# A static table with 10 rows
my_static_table = empty_table(10).update(
[
"Timestamp = now()",
"Symbol = `HDSN`",
"Price = 10.0 * Math.random()",
"Size = (int) Math.round(1000.0 * Math.random())",
]
)
# Using today's date for the Column Partition, and the value "static" for the Internal Partition,
# log `MarketData.Trades` to a system table once. This will add the 10 rows from `my_static_table`
stl.log_table(
namespace="MarketData",
table_name="Trades",
table=my_static_table,
columnPartition=None, # Use current date as partition
internalPartition="static",
)
Logging data from an updating table
A Table
can be written on a live basis using SystemTableLogger.logTableIncremental(...)
. The Table
used as the source to this call must be an append-only Table
. Rows cannot be removed or modified in the Table
. As with the static writer, each call to SystemTableLogger.logTableIncremental(...)
will log the entire Table
, so care must be taken to avoid duplication if there are multiple calls.
import io.deephaven.enterprise.database.SystemTableLogger
// A table which has a new row added every 2 seconds
TradesTicking = timeTable("PT2s").update(
"Timestamp = now()",
"Symbol = `HDSN`",
"Price = 10.0 * Math.random()",
"Size = (int) Math.round(1000.0 * Math.random())"
)
// Use today's date for the Column Partition, and use the value "dynamic" for the Internal Partition
opts = SystemTableLogger.newOptionsBuilder()
.currentDateColumnPartition(true)
.internalPartition("dynamic")
.build()
// Log `MarketData.Trades` to a system table incrementally. A new row will be added every 2 seconds from `TradesTicking`
lh = SystemTableLogger.logTableIncremental(db, "MarketData", "Trades", TradesTicking, opts)
// When finished logging, call `lh.close()`. After this call, any updates to `TradesTicking` will not be logged to the
// `MarketData.Trades` table
//lh.close()
from deephaven import time_table
from deephaven_enterprise import system_table_logger as stl
# A table which has a new row added every 2 seconds
my_time_table = time_table("PT2s").update(
[
"Timestamp = now()",
"Symbol = `HDSN`",
"Price = 10.0 * Math.random()",
"Size = (int) Math.round(1000.0 * Math.random())",
]
)
# Using today's date for the Column Partition, and the value "dynamic" for the Internal Partition,
# log `MarketData.Trades` to a system table incrementally. A new row will be added every 2 seconds from `my_time_table`
lh = stl.log_table_incremental(
namespace="MarketData",
table_name="Trades",
table=my_time_table,
columnPartition=None, # Use current date as partition
internalPartition="dynamic",
)
# When finished logging, call `lh.close()`. After this call, any updates to `my_time_table` will not be logged to the
# `MarketData.Trades` table
# lh.close()