lastBy Ingesters in a Core+ Worker
Deephaven Core+ workers can be used to ingest Deephaven binary logs and export a lastBy view over Barrage. Using a Core+ worker for a lastBy ingester with Barrage allows both Core+ and Legacy workers to subscribe to the resulting lastBy view. Legacy lastBy Ingesters export tables using a Java-serialization based protocol, which cannot be used by a Core+ worker. Core+ Kafka ingesters may also generate lastBy views from a Kafka stream, which is not possible with the Legacy Kafka ingester.
This guide describes how to configure a lastBy Data Import Server (DIS) that receives Deephaven binary logs from the tailer.
Add an ImportState to the Schema
To configure a lastBy ingester, first you must include an ImportState
in a Listener
block (or equivalently LoggerListener
block) of the schema. For a Core+ worker, the type of the import state must be io.deephaven.enterprise.lastbystate.CoreLastByTableImportState
. In addition to specifying the class of the import state, you must provide a stateUpdateCall
, which is invoked on each row received by the Data Import Server. The state update call is invoked on the import state class, and should specify the columns to use for your lastBy key. For example, the following import state would generate a lastBy view for the symbol
column.
<ImportState stateUpdateCall="newRow(symbol)" importStateType="io.deephaven.enterprise.lastbystate.CoreLastByTableImportState"/>
If both symbol and exchange are desired, then simply use both columns as arguments to the state update call as follows:
<ImportState stateUpdateCall="newRow(symbol)" importStateType="io.deephaven.enterprise.lastbystate.CoreLastByTableImportState"/>
Complete XML Schema Example
<Table name="EqTrade" namespace="Market" storageType="NestedPartitionedOnDisk">
<Partitions keyFormula="__PARTITION_AUTOBALANCE_SINGLE__" />
<Column name="Date" dataType="java.lang.String" columnType="Partitioning" />
<Column name="Timestamp" dataType="com.illumon.iris.db.tables.utils.DBDateTime" columnType="Normal" />
<Column name="FilePos" dataType="long" columnType="Normal" />
<Column name="exchSeq" dataType="long" columnType="Normal" />
<Column name="symbol" dataType="java.lang.String" columnType="Grouping" />
<Column name="tradePartId" dataType="java.lang.String" columnType="Normal" />
<Column name="totalVolume" dataType="long" columnType="Normal" />
<Column name="tradeVolume" dataType="long" columnType="Normal" />
<Column name="tradeQualifier0" dataType="short" columnType="Normal" />
<Column name="tradeQualifier1" dataType="short" columnType="Normal" />
<Column name="tradeQualifier2" dataType="short" columnType="Normal" />
<Column name="tradeQualifier3" dataType="short" columnType="Normal" />
<Column name="tradeId" dataType="long" columnType="Normal" />
<Column name="tradePrice" dataType="double" columnType="Normal" />
<Column name="tradeTick" dataType="int" columnType="Normal" />
<Column name="tradeDate" dataType="com.illumon.iris.db.tables.utils.DBDateTime" columnType="Normal" />
<Column name="highPrice" dataType="double" columnType="Normal" />
<Column name="lowPrice" dataType="double" columnType="Normal" />
<Column name="openPrice" dataType="double" columnType="Normal" />
<Column name="todayClosePrice" dataType="double" columnType="Normal" />
<Column name="vwap" dataType="double" columnType="Normal" />
<Column name="vendorSeq" dataType="long" columnType="Normal" />
<Column name="msgFlags" dataType="java.lang.String" columnType="Normal" />
<Column name="receiveTime" dataType="com.illumon.iris.db.tables.utils.DBDateTime" columnType="Normal" />
<Column name="exchangeTime" dataType="com.illumon.iris.db.tables.utils.DBDateTime" columnType="Normal" />
<Column name="transmitTime" dataType="com.illumon.iris.db.tables.utils.DBDateTime" columnType="Normal" />
<LoggerListener logFormat="1" loggerPackage="com.illumon.datagen" loggerClass="EqTradeFormat1Logger" listenerPackage="com.illumon.datagen" listenerClass="EqTradeFormat1Listener">
<SystemInput name="row" type="com.illumon.datagen.EqTrade" />
<SystemInput name="timestamp" type="long" />
<SystemInput name="filePos" type="long" />
<ImportState stateUpdateCall="newRow(symbol)" importStateType="io.deephaven.enterprise.lastbystate.CoreLastByTableImportState"/>
<Column name="Date" intradayType="none" />
<Column name="Timestamp" dbSetter="new com.illumon.iris.db.tables.utils.DBDateTime(Timestamp)" intradaySetter="timestamp" />
<Column name="FilePos" intradaySetter="filePos" />
<Column name="exchSeq" intradaySetter="row.exchSeq" />
<Column name="symbol" intradaySetter="row.symbol" />
<Column name="tradePartId" intradaySetter="row.tradePartId" />
<Column name="totalVolume" intradaySetter="row.totalVolume" />
<Column name="tradeVolume" intradaySetter="row.tradeVolume" />
<Column name="tradeQualifier0" intradaySetter="row.tradeQualifier0" />
<Column name="tradeQualifier1" intradaySetter="row.tradeQualifier1" />
<Column name="tradeQualifier2" intradaySetter="row.tradeQualifier2" />
<Column name="tradeQualifier3" intradaySetter="row.tradeQualifier3" />
<Column name="tradeId" intradaySetter="row.tradeId" />
<Column name="tradePrice" intradaySetter="row.tradePrice" />
<Column name="tradeTick" intradaySetter="row.tradeTick" />
<Column name="tradeDate" dbSetter="new com.illumon.iris.db.tables.utils.DBDateTime(tradeDate)" intradaySetter="row.tradeDate" />
<Column name="highPrice" intradaySetter="row.highPrice" />
<Column name="lowPrice" intradaySetter="row.lowPrice" />
<Column name="openPrice" intradaySetter="row.openPrice" />
<Column name="todayClosePrice" intradaySetter="row.todayClosePrice" />
<Column name="vwap" intradaySetter="row.vwap" />
<Column name="vendorSeq" intradaySetter="row.vendorSeq" />
<Column name="msgFlags" intradaySetter="row.msgFlags" />
<Column name="receiveTime" dbSetter="new com.illumon.iris.db.tables.utils.DBDateTime(receiveTime)" intradaySetter="row.receiveTime" />
<Column name="exchangeTime" dbSetter="new com.illumon.iris.db.tables.utils.DBDateTime(exchangeTime)" intradaySetter="row.exchangeTime" />
<Column name="transmitTime" dbSetter="new com.illumon.iris.db.tables.utils.DBDateTime(transmitTime)" intradaySetter="row.transmitTime" />
</LoggerListener>
</Table>
Create a Storage Directory
A lastBy DIS serves both lastBy and regular intraday queries. Therefore, it can serve as the sole storage location for the intraday data.
The intraday data is stored under a directory that must be exclusively owned by the DIS. Deephaven recommends the following location for the lastBy storage directory:
/db/dataImportServers/[DIS_Name]/
The lastBy storage directory must exist before the DIS persistent query can be started. When the script is running in a regular query or console, the dbquery user on the server will need to have read and write access to this path. When the script is run as a persistent query on a merge server (preferred for production use), the dbmerge
account will need read and write privileges.
Configure Routing
The lastBy DIS must be added to the data routing configuration. See Add a Data Import Server for instructions.
The following example adds a lastBy DIS named "lastByCore" with a claim for the EqTrade
table in the Market
namespace. The lastBy DIS uses dynamic endpoints, so port values are not supplied.
sudo -u irisadmin /usr/illumon/latest/bin/dhconfig dis add --name lastByCore --claim Market.EqTrade
---
lastByCore:
endpoint:
serviceRegistry: registry
userIntradayDirectoryName: Users
throttleKbps: -1
claims:
- {namespace: Market, tableName: EqTrade}
storage: private
Write an Ingestion Query
After configuring a schema, storage, and routing, the next step is to create an in-worker DIS for the desired table. The following script creates a DIS with the name "lastByCore". The routing YAML file uses "private" storage, so the second parameter to the getDisByNameWithStorage
method specifies a pathname to a directory for the exclusive use of this in-worker DIS. The DIS automatically starts and listens on the Table Data and Tailer ports configured in routing. The above example uses a dynamic port registered with the service registry.
import io.deephaven.enterprise.dataimportserver.DataImportServerTools
dis = DataImportServerTools.getDisByNameWithStorage("lastByCore", "/db/dataImportServers/lastbyCorePlus")
In addition to starting the DIS in the worker, we must create the lastBy view using the io.deephaven.enterprise.lastbystate.LastByPartitionedTableFactory
(this must be done in the same worker that the DIS is running). There are three steps:
- Create a
LastByPartitionedTableFactory
for the DIS. - Create a PartitionedTable for the namespace, table name, and column partition of interest. Each internal partition is represented as a constituent of the partitioned table.
- Merge the partitioned table into a single view that contains the last row for a given key in each internal partition.
import io.deephaven.enterprise.lastbystate.LastByPartitionedTableFactory
lbf = LastByPartitionedTableFactory.forDataImportServer(dis)
partitionedTable = lbf.createPartitionedTable("Market", "EqTrade", today())
lastTradeBySymbol = partitionedTable.merge()
After the DIS is started, other queries can subscribe to the resultTable
variable using Barrage. As with other tables exported from a persistent query, you must grant view access to the DIS query and optionally apply ACLs. Note that the ACLs for the source table are not automatically applied to the exported result; the query writer must apply any desired ACLs.
Retrieve the table from another query
The full history of the ingested data is retrieved as with any other table, in both Legacy and Core+ workers.
trades = db.liveTable("Market", "EqTrade").where("Date=today()")
trades = db.live_table("Market", "EqTrade").where(["Date=today()"])
The lastBy table can be retrieved using Barrage:
import io.deephaven.uri.ResolveTools
tradesLastBy = ResolveTools.resolve("pq://LastByQuery/scope/lastTradeBySymbol")
from deephaven_enterprise import uri
trades_last_by = uri.resolve("pq://LastByQuery/scope/lastTradeBySymbol")