Configuring import-driven lastBy queries
Deephaven Import-driven lastBy allows a separate Data Import Server (DIS) process to maintain one or more in-memory lastBy
tables for intraday data, and export them for use by other queries.
Note
This article describes configuring a lastBy ingester in a Legacy worker. Legacy workers can only export data to other Legacy workers. If you use a Core+ lastBy ingester which distributes the lastBy result over Barrage, then both Legacy and Core+ workers can make use of the lastBy view.
This configuration requires a DIS to be running in a Deephaven worker, rather than as a background process as is the case for the initial/default Data Import Server.
Import-driven lastBy DIS configurations can be tested and developed in Deephaven queries, but are more commonly be run in production within persistent queries. Each Import-driven lastBy DIS process needs a pair of TCP ports on the Deephaven server for receiving tailed data and (optionally) publishing tables.
Import-driven lastBy (or "fast lastBy") differs from the .lastBy()
method that can be used in queries in two principle ways:
- It works on raw row data as it arrives from the Tailer.
- It persists
lastBy
state information to the table's checkpoint record on disk.
Initial calculation of lastBy state for existing data is similar for the two approaches. However, this is a minor consideration since Import-driven lastBy will typically be running before any intraday data arrives.
Import-driven lastBy tables allow users to compute the aggregation during the data ingestion process. Rather than move all the data to the worker that is consuming the lastBy to do the computation, we move only the lastBy rows that have recently ticked. For queries that only need a "last-by" view of some data, Import-driven lastBy tables can be a substantial performance enhancer, greatly reducing data movement and redundant computation, and allowing faster initialization and update processing. Furthermore, if the DIS is restarted mid-day, an Import-driven lastBy will restart from the lastBy state maintained by the import process. If a query that uses the .lastBy()
method is restarted, it must rescan the entire table to calculate lastBy values.
Except for some particular schema attributes, most of the setup for the Import-driven lastBy Data Import Server is similar to the setup for a regular DIS / Tailer pair.
Setup
Storage Directory
The Import-driven lastBy DIS can be configured to serve both lastBy and regular intraday queries. Therefore, it can serve as the sole storage location for the intraday data.
The intraday data will be stored under a db root specified by the storage attribute or specified in the query script. This location may be the same as the system db root (only if data routing filters are configured so that no other process will read or write the table locations), or an independent location. Deephaven recommends the following location for the lastBy storage directory:
/db/dataImportServers/[DIS_Name]/
The lastBy storage directory must exist before the Import-driven lastBy DIS query can be started, and before changing the data routing configuration to reference it. When the script is running in a regular query or console, the dbquery user on the server will need to have read and write access to this path. When the script is run as a persistent query on a merge server (preferred for production use), the dbmerge
account will need read and write privileges.
Schema Changes
For a table to be usable with Import-driven lastBy, the table schema will need to be updated with definition information to describe how lastBy data should be calculated and updated.
An example of a schema with an Import-driven lastBy definition element (highlighted below) follows:
<Table name="QuoteStock" namespace="DXFeed storageType="NestedPartitionedOnDisk"
loggerPackage="com.illumon.iris.dxquotestock.gen"
listenerPackage="com.illumon.iris.dxfeed" >
<Partitions keyFormula="__WRITABLE_PARTITIONS__[abs((__NS_TN__ + Sym).hashCode() + __DAY_OF_YEAR__) % __NUM_WRITABLE_PARTITIONS__]"/>
<LoggerImports>
import static com.illumon.iris.dxfeed.DXStockExchangeMap.getExchange;
</LoggerImports>
<SystemInput name="quote" type="com.dxfeed.event.market.Quote"/>
<SystemInput name="Timestamp" type="long"/>
<ImportState stateUpdateCall="newRow(Sym, BidExchange)"
importStateType="com.illumon.iris.db.tables.dataimport.importstate.lastby.LastByTableImportState"/>
<Column name="Date" dataType="String" columnType="Partitioning" intradayType="none" />
<Column name="Timestamp" dataType="DateTime" columnType="Normal"/>
<Column name="Sym" dataType="String" columnType="Grouping" intradaySetter="quote.getEventSymbol()" />
<Column name="BidTimestamp" dataType="DateTime" columnType="Normal" intradaySetter="quote.getBidTime()" />
<Column name="Bid" dataType="double" columnType="Normal" intradaySetter="quote.getBidPrice()" />
<Column name="BidSize" dataType="long" columnType="Normal" intradaySetter="quote.getBidSize()" />
<Column name="BidExchange" dataType="String" columnType="Normal" intradaySetter="getExchange(quote.getBidExchangeCode())" />
<Column name="AskTimestamp" dataType="DateTime" columnType="Normal" intradaySetter="quote.getAskTime()" />
<Column name="Ask" dataType="double" columnType="Normal" intradaySetter="quote.getAskPrice()" />
<Column name="AskSize" dataType="long" columnType="Normal" intradaySetter="quote.getAskSize()" />
<Column name="AskExchange" dataType="String" columnType="Normal" intradaySetter="getExchange(quote.getAskExchangeCode())" />
</Table>
Note
For tables with one or more Listener
or LoggerListener
blocks, the ImportState
block must be placed inside the relevant Listener
or LoggerListener
sections.
The arguments to newRow are the names of columns to be used for the lastBy. This is similar to the syntax of using lastBy()
in a query. In the above example, an Import-driven lastBy DIS receiving data for this table will maintain a lastBy table with the last entries for each combination of Sym and BidExchange.
Configure data routing for the new in-worker Data Import Server
Create a new data import server, claiming the tables that will be handled by this DIS, and using a dynamic endpoint. See instructions in Add a Data Import Server.
Starting and Using the Import-driven lastBy DIS
Note
The remainder of this document describes configuring a lastBy DIS using a Legacy worker. You may instead configure lastBy Ingesters in a Core+ Worker.
The following code starts a new Import-driven lastBy DIS process and assigns it to the dis
variable:
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.illumon.iris.db.tables.dataimport.importstate.lastby.LastByTableMapFactory
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import java.nio.file.Paths
import com.illumon.util.jettyhelper.JettyServerHelper
import com.illumon.iris.console.utils.LiveTableMapSelector
import com.illumon.iris.db.v2.routing.DataRoutingService
import com.illumon.iris.db.v2.routing.DataRoutingServiceFactory
import com.illumon.iris.db.schema.SchemaServiceFactory
routingService = DataRoutingServiceFactory.getDefault()
// if the dis configuration specifies "private" storage, then you must provide a storage location, otherwise set this to null
storage = "/db/dataImportServers/Simple_LastBy"
disName = "Simple_LastBy"
dis = DataImportServer.getDataImportServer(null, disName, Configuration.getInstance(), SchemaServiceFactory.getDefault(), routingService, storage)
dis.startInWorker()
The argument to routingService.getDataImportServiceConfig
must match the section created under dataImportServers
above. If you use private
storage, you must instead call routingService.getDataImportServiceConfigWithStorage
and pass the pathname for the storage. It is an error to call getDataImportServiceConfigWithStorage
with a data import server that defines the storage root.
Once the Import-driven lastBy DIS has been started, a LastByTableMapFactory can be created for it. Then one or more TableMaps can be created from the factory instance. A merge()
operation can be applied to a TableMap to coalesce it into a regular table usable in Deephaven queries:
tmf=LastByTableMapFactory.forDataImportServer(dis)
OrderEventMap = tmf.createTableMap("Order","Event",currentDate(TZ_NY))
OrderEventLastByWidget = new LiveTableMapSelector(OrderEventMap, true)
OrderEventLastByTableSorted = OrderEventMap.merge()
Example
tmf=LastByTableMapFactory.forDataImportServer(dis)
DXQuoteLastByTableMap = tmf.createTableMap("DXFeed", "QuoteStock", currentDate(TZ_NY))
DXQuoteLastByTable = DXQuoteLastByTableMap.merge()
In the above code excerpt, DXQuoteLastByTable is a Deephaven table comparable to what would be created by:
DXQuoteLastByTable=db.i("DXFeed","QuoteStock")
.where("Date=currentDateNy()")
.lastBy("Sym","BidExchange")
Notes
Memory requirements for the DIS worker process are proportional to the number of columns in the table(s) being processed, and also to the number of unique lastBy key values. It is recommended to start with double the size of a snapshot of a conventionally calculated, fully populated, lastBy table for the same data, plus some amount of overhead for the DIS process itself and its buffers.
Stopping or restarting a Tailer that is streaming data to the Import-driven lastBy DIS will cause the DIS process to log a warning about its lost connection. Once the Tailer resumes processing, the DIS will also resume updating lastBy information and publishing results to table subscribers.
Stopping or restarting the worker running the lastBy query will cause any consumers of this data to log warnings about lost connections. Once the query resumes, these processes should continue.