Configuring import-driven lastBy queries

Deephaven Import-driven lastBy allows a separate Data Import Server (DIS) process to maintain one or more in-memory lastBy tables for intraday data, and export them for use by other queries.

This configuration requires a DIS to be running within a Deephaven worker, rather than as a background process as is the case for the initial/default Data Import Server.

Import-driven lastBy DIS configurations can be tested and developed in Deephaven queries, but will more commonly be run in production within a persistent query. Each Import-driven lastBy DIS process will need a pair of TCP ports on the Deephaven server for receiving tailed data and (optionally) publishing tables.

Import-driven lastBy (or "fast lastBy") differs from the .lastBy() method that can be used in queries in two principle ways:

  1. It works on raw row data as it arrives from the Tailer.
  2. It persists lastBy state information to the table's checkpoint record on disk.

Initial calculation of lastBy state for existing data is similar for the two approaches. However, this is not a significant consideration since Import-driven lastBy will typically be running before any intraday data arrives.

Import-driven lastBy tables allow users to compute the aggregation during the data ingestion process. Rather than move all the data to the worker that is consuming the lastBy to do the computation, we move only the lastBy rows that have recently ticked. For queries that only need a "last-by" view of some data, Import-driven lastBy tables can be a substantial performance enhancer, greatly reducing data movement and redundant computation, and allowing faster initialization and update processing. Furthermore, if the DIS is restarted mid-day, an Import-driven lastBy will restart from the lastBy state maintained by the import process. If a query that uses the .lastBy() method is restarted, it must rescan the entire table to calculate lastBy values.

With the exception of some particular schema attributes, most of the setup for the Import-driven lastBy Data Import Server is similar to the setup for a regular DIS / Tailer pair.

The Import-driven lastBy feature should use YAML data routing configuration.

Setup

Storage Directory

The Import-driven lastBy DIS can be configured to serve both lastBy and regular intraday queries. Therefore, it can serve as the sole storage location for the intraday data.

The intraday data will be stored under a db root specified by the storage attribute. This location may be the same as the system db root (only if data routing filters are configured so that no other process will read or write the table locations), or an independent location. We recommend the following location for the lastBy storage directory:

/db/dataImportServers/[DIS_Name]/

The lastBy storage directory needs to exist before the Import-driven lastBy DIS query can be started. When the script is running in a regular query or console, the dbquery user on the server will need to have read and write access to this path. When the script is run as a persistent query against a merge server (preferred for production use), the dbmerge account will need read and write privileges.

Schema Changes

For a table to be usable with Import-driven lastBy, the table schema will need to be updated with definition information to describe how lastBy data should be calculated and updated.

An example of a schema with an Import-driven lastBy definition element (highlighted below) follows:

<Table name="QuoteStock" namespace="DXFeed storageType="NestedPartitionedOnDisk"
loggerPackage="com.illumon.iris.dxquotestock.gen"
listenerPackage="com.illumon.iris.dxfeed" >
<Partitions keyFormula="__WRITABLE_PARTITIONS__[abs((__NS_TN__ + Sym).hashCode() + __DAY_OF_YEAR__) % __NUM_WRITABLE_PARTITIONS__]"/>

<LoggerImports>
import static com.illumon.iris.dxfeed.DXStockExchangeMap.getExchange;
</LoggerImports>

<SystemInput name="quote" type="com.dxfeed.event.market.Quote"/>
<SystemInput name="Timestamp" type="long"/>

<ImportState stateUpdateCall="newRow(Sym, BidExchange)"
importStateType="com.illumon.iris.db.tables.dataimport.importstate.lastby.LastByTableImportState"/>

<Column name="Date" dataType="String" columnType="Partitioning" intradayType="none" />
<Column name="Timestamp" dataType="DateTime" columnType="Normal"/>
<Column name="Sym" dataType="String" columnType="Grouping"    intradaySetter="quote.getEventSymbol()" />
<Column name="BidTimestamp" dataType="DateTime" columnType="Normal" intradaySetter="quote.getBidTime()" />
<Column name="Bid" dataType="double" columnType="Normal"  intradaySetter="quote.getBidPrice()" />
<Column name="BidSize" dataType="long" columnType="Normal" intradaySetter="quote.getBidSize()" />
<Column name="BidExchange" dataType="String" columnType="Normal" intradaySetter="getExchange(quote.getBidExchangeCode())" />
<Column name="AskTimestamp" dataType="DateTime" columnType="Normal" intradaySetter="quote.getAskTime()" />
<Column name="Ask" dataType="double" columnType="Normal"  intradaySetter="quote.getAskPrice()" />
<Column name="AskSize" dataType="long" columnType="Normal" intradaySetter="quote.getAskSize()" />
<Column name="AskExchange" dataType="String" columnType="Normal" intradaySetter="getExchange(quote.getAskExchangeCode())" />
</Table>

Note

For tables that have one or more Listener or LoggerListener blocks, the ImportState block must be placed inside the relevant Listener or LoggerListener sections.

The arguments to newRow are the names of columns to be used for the lastBy. This is similar to the syntax of using lastBy() in a query. In the above example, an Import-driven lastBy DIS receiving data for this table will maintain a lastBy table with the last entries for each combination of Sym and BidExchange.

Data Routing Configuration

The system needs to be configured so that data for the lastBy tables is sent to and read from the appropriate processes and locations.

Configure a storage location

Add the location where the lastBy DIS process will store its intraday data. In this section, "Simple_LastBy" is an arbitrary identifier, which will be referenced later in the configuration file. The dbRoot "DIS_Simple_LastBy" is also arbitrary and not directly related to the identifier.

storage:
  - name: default
    dbRoot: /db
  - name: Simple_LastBy
    dbRoot: /path/to/DIS_Simple_LastBy

Configure the new in-worker Data Import Server

Create a new entry in the "dataImportServers" section. Create filters so that only the desired table or tables are accepted. You must assign appropriate values for host, tailerPort, storage, definitionsStorage, and tableDataServicePort.

   dataImportServers
   ...
   Simple_LastBy:
     host: *ddl_query1
     tailerPort: 22222
     # handle Orders tables only
     filters: {whereTableKey: "NamespaceSet = `System` && Namespace == `Order`"}
     webServerParameters:
       enabled: false
     storage: Simple_LastBy
     definitionsStorage: default
     tableDataPort: 22223

Adjust existing data import server(s)

Assuming you want the data handled by the in-worker DIS to be handled exclusively by that DIS instance, adjust the filters on any other DIS instances to exclude the table(s).

  dataImportServers
  ...
  db_dis:
    ...
    # don't process Orders
    filters: {whereTableKey: "NamespaceSet = `System` && Namespace != `Order`"}

Change data routing configuration(s)

The tableDataServices section defines how data is routed. One or or more entries in this section will need to be adjusted so that query and merge workers will source data appropriately. In this example, it is the table data cache proxy service that serves live data from the data import services.

This example makes the table data cache proxy serve the live data, and points the Orders namespace to the lastBy DIS while excluding it from the default DIS.

db_tdcp:
   host: localhost
   port: *default-tableDataCacheProxyPort
   sources:
   - name: db_dis
     # exclude namespace Order
     filters: {whereTableKey: "NamespaceSet = `System` && Namespace != `Order`", whereLocationKey: "ColumnPartition >= currentDateNy()"}
   - name: db_ltds
     filters: {whereTableKey: "NamespaceSet = `System` && Namespace != `Order`", whereLocationKey: "ColumnPartition < currentDateNy()"}
   - name: db_rta
     # all user data
     filters: {namespaceSet: User}
   - name: Simple_LastBy
     # include only namespace Order
     filters: {whereTableKey: "NamespaceSet = `System` && Namespace == `Order`"}

Tailer configuration

The data import servers to which data is sent is determined by the Data Routing Service. For a given table, data will be sent to each DIS for which the filters match. With the configuration changes above, all data for namespace Order will be tailed only to the lastBy DIS. No further configuration is needed for the tailers.

Starting and Using the Import-driven lastBy DIS

The following code starts a new Import-driven lastBy DIS process and assigns it to the dis variable:

import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer;
import com.illumon.iris.db.tables.dataimport.importstate.lastby.LastByTableMapFactory;
import com.fishlib.configuration.Configuration;
import com.fishlib.util.process.ProcessEnvironment;
import java.nio.file.Paths;
import com.illumon.util.jettyhelper.JettyServerHelper;
import com.illumon.iris.console.utils.LiveTableMapSelector;
import com.illumon.iris.db.v2.configuration.DataRoutingService;
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory;
import com.illumon.iris.db.schema.SchemaServiceFactory;

routingService = DataRoutingServiceFactory.getDefault();

disConfig = routingService.getDataImportServiceConfig("Simple_LastBy");
dis = DataImportServer.getDataImportServer(ProcessEnvironment.getDefaultLog(), disConfig, Configuration.getInstance(), SchemaServiceFactory.getDefault());
dis.startInWorker();

The argument to routingService.getDataImportServiceConfig must match the section created under dataImportServers above.

Once the Import-driven lastBy DIS has been started, a LastByTableMapFactory can be created for it. Then one or more TableMaps can be created from the factory instance. A merge() operation can be applied to a TableMap to coalesce it into a regular table usable in Deephaven queries:

tmf=LastByTableMapFactory.forDataImportServer(dis);

OrderEventMap = tmf.createTableMap("Order","Event",currentDate(TZ_NY));
OrderEventLastByWidget = new LiveTableMapSelector(OrderEventMap, true);
OrderEventLastByTableSorted = OrderEventMap.merge()

Example

tmf=LastByTableMapFactory.forDataImportServer(dis);
DXQuoteLastByTableMap = tmf.createTableMap("DXFeed", "QuoteStock", currentDate(TZ_NY));
DXQuoteLastByTable = DXQuoteLastByTableMap.merge();

In the above code excerpt, DXQuoteLastByTable is a Deephaven table comparable to what would be created by:

DXQuoteLastByTable=db.i("DXFeed","QuoteStock")
    .where("Date=currentDateNy()")
    .lastBy("Sym","BidExchange")

Notes

Memory requirements for the DIS worker process are proportional to the number of columns in the table(s) being processed, and also to the number of unique lastBy key values. It is recommended to start with double the size of a snapshot of a conventionally calculated, fully populated, lastBy table for the same data, plus some amount of overhead for the DIS process itself and its buffers.

Stopping or restarting a Tailer that is streaming data to the Import-driven lastBy DIS will cause the DIS process to log a warning about its lost connection. Once the Tailer resumes processing, the DIS will also resume updating lastBy information and publishing results to table subscribers.

Stopping or restarting the worker running the lastBy query will cause any consumers of this data to log warnings about lost connections. Once the query resumes, these processes should continue.