Set up a replayer

One of the first problems most Deephaven Users will need to solve is “Where is my Data?”. There may already be jobs that ingest data to populate tables, or perhaps you want to experiment with data from a new source that has not yet been integrated with your system.

Additionally, you may want to replay a previous day’s data so that you can:

  • Test changes to a script against known data.
  • Troubleshoot issues that a query had on a previous day’s data.
  • Simulate data ticking into a table for the current date using previous data.

What is a Replay Query?

A Replay Query is a query that replays historical data for a specific table as intraday ticking data. It will release the rows to the query at the time those rows actually appeared based upon their Timestamp column. This way you can take an existing query that expects to operate on intraday data and execute it on historically merged data.

A small variation of this with some additional scaffolding can allow you to replay that historical data into another table replacing original timestamps with current-day timestamps.

At its core, the replay query redirects the db.i() command to load the historical version of the table (db.t()) instead. Then it applies a special where() clause that releases rows based upon their configured timestamp.

For this, all you must do is create a new Query and set the type to “Live Query Replay”, then put the code that you want to test into the Script Editor.

Caution

Take note that the “Replay Query” type is currently only configurable via the Swing UI.

img

The next step is to configure the replay settings. There are three parameters:

  • “Sorted Replay”
  • “Replay Time of Day”
  • “Replay Date”

Sorted Replay

img

The Sorted Replay checkbox will guarantee that the data will be replayed in Timestamp order - that is, in the order of the table, sorted by the configured timestamp column.

Replay Time Of Day

img

The Replay Time Of Day configuration provides two options that control the time of day that the script will begin at.

  • Start at Current Time - The query is executed using the current time of day.
  • Fixed Start Time - The query is executed using the value entered as the current time of day.

This is useful if you need to replay data that streamed initially in a different region (Hong Kong for example) than the time zone you are testing in. To set a specific start time, enter time in the format: HH:MM:SS

Replay Date

img

The Replay date option controls the date that is returned by DQL’s date methods such as currentDateNy() and lastBusinessDateNy().

Replaying Historical Data

The simplest use case for a Replay Query is to replay a historical data set directly to a query script. If your script already uses DQL’s date methods then you will not need to do any additional work to select the appropriate date partition for replay. If you have not, then you must analyze your query and ensure it is selecting the correct playback date by hand.

For example, if you are manually selecting dates, as in:

myTable = db.i("Market", "Trades").where("Date=`2020-09-22`")

Note

Call the historical table but replace the db.t() with db.i().

You will need to change the selected date in the .where() clause manually. A good pattern to follow in your queries, if currentDateNy() is not sufficient for your needs, is to store the date partition in a local variable and reuse it throughout the query:

CURRENT_DATE = `2020-09-22`

myTable = db.i(“Market”, “Trades”).where(“Date=CURRENT_DATE”)

This makes it easier to change the operational date uniformly throughout the query.

My Timestamp Column is not ‘Timestamp’

A Replay Query will assume that the replay Timestamp column in all tables is "Timestamp” unless you have defined it otherwise. If your Timestamp column is named something else - say “TradeTime” - you must tell the query that. This must be done by adding additional arguments to the “Extra JVM Arguments” of the query. You should add this for each table that does not have a Timestamp column.

-DReplayDatabase.TimestampColumn.<Namespace>.<Table Name>=<Column Name>

For example:

img

After this point, save your query and your data will be replayed to the query script.

Simulating Intraday Data With Replay

While it is often useful to simply replay historical data to a script, it may also be useful to use a historical replay to simulate current intraday data. This use case is very similar to the Replay query described in the previous exception except that instead of streaming data from db.i() directly into a script, we will re-log it and transform timestamps to match today’s date and time.

This has the advantage of allowing you to run unmodified queries against simulated data with a little bit more complexity and configuration.

First we must configure the schema for the table you want to replay with to enable TableLogger generation, then create a Schema for the simulated table to write to. Here is an example schema with this update:

<Table namespace="Market" name="Trade" storageType="NestedPartitionedOnDisk">
  <Partitions keyFormula="${autobalance_single}" />
  <Column name="Date" dataType="String" columnType="Partitioning" />
  <Column name="Timestamp" dataType="DateTime" />
  <Column name="Seqn" dataType="long" />
  <Column name="MessageType" dataType="String" />
  <Column name="Source" dataType="long" />
  <Column name="Ticker" dataType="String" columnType="Grouping" />
  <Column name="TradePrice" dataType="double" />
  <Column name="TradeSize" dataType="long" />
  <LoggerListener logFormat="1" loggerPackage="io.deephaven.example.gen" loggerClass="MarketTradeFormat1Logger" listenerPackage="io.deephaven.example.gen" listenerClass="MarketTradeFormat1Listener" rethrowLoggerExceptionsAsIOExceptions="true" tableLogger="true"
>
  <SystemInput name="timestamp" type="long" />
  <SystemInput name="seqn" type="long" />
  <SystemInput name="recType" type="String" />
  <SystemInput name="source" type="long" />
  <SystemInput name="ticker" type="String" />
  <SystemInput name="tradePrice" type="double" />
  <SystemInput name="tradeSize" type="long" />
  <Column name="Date" intradayType="none" intradaySetter="date" />
  <Column name="Timestamp" intradaySetter="timestamp" dbSetter="new com.illumon.iris.db.tables.utils.DBDateTime(Timestamp)" />
  <Column name="Seqn" intradaySetter="seqn" />
  <Column name="MessageType" intradaySetter="recType" />
  <Column name="Source" intradaySetter="source" />
  <Column name="Ticker" intradaySetter="ticker" />
  <Column name="TradePrice" intradaySetter="tradePrice" />
  <Column name="TradeSize" intradaySetter="tradeSize" />
 </LoggerListener>
</Table>

The addition of the tableLogger=”true" clause enables the ability to log Tables directly to binary log files. You could replay data directly back to this Namespace and table, however it is not recommended to do so. It is best to explicitly separate simulated or replayed data from actual data. This can be done simply by adding a new schema using CopyTable with a new namespace. Here is an example:

<CopyTable namespace="MarketSim" sourceNamespace="Market" name="Trade" sourceName="Trade" />

This will add a new system table to Deephaven under the namespace “MarketSim” that you can point your queries to.

Next, configure a replay query exactly as in the previous section. Write a script that pulls the table from the original namespace and simply logs it and its updates back to a binary log file. See below for an example in Groovy:

// These imports are required for the classes you need to create a TableLogger
import com.fishlib.configuration.Configuration
import com.illumon.iris.db.util.logging.EventLoggerFactory
import io.deephaven.example.MarketTradeFormat1Logger
import com.illumon.iris.db.tables.dataimport.TableLoggerBase

// It is good practice to use a variable to store the value of the Date clauses for your tables.
REPLAY_DATE=currentDateNy()

// Here we retrieve the table we are going to replay,
// but we add an updateView clause to change the value
// of the Date column to match today's date.
// This way, when the rows are logged they will have the correct date.
sourceTable = db.i("Market", "Trade").where("Date=`$REPLAY_DATE`")
    .updateView("Date=currentDateNy()")

// This is just a boilerplate closure that will provide the logger factory below with the partition
cpf = { v -> return currentDateNy() }

// Get an instance of the configuration object to pass down to the logger factory
config = Configuration.getInstance()

// Now create the TableLogger
logger = EventLoggerFactory.createIntradayLogger(config,
    "Replay", // This is the name of the logger, it can be anything
    log, // A Logger object for error reporting
    "MarketSim", // The namespace of the Table you are getting a TableLogger for
    "Trade", // The name of the table you are getting a logger for
    io.deephaven.example.MarketTradeFormat1Logger.class, // The name of the logger class to create
    config.getServerTimezone().getID(), // The timezone of logging
    cpf, false)

// Finally connect the sourceTable to the logger and tell it to write all updates to the log.
logger.logTable(sourceTable, sourceTable.getIndex(), TableLoggerBase.Flags.RowByRow, true, false)

// At this point as rows tick into "Market.Trade" they will be automatically logged back to "MarketSim.Trade"

There is one final thing to account for: if your table contains any columns that are DbDateTime columns, you will probably want to convert these into values with the current date, but same original timestamp. This requires a little bit of boilerplate code explained below:

// These imports are required for the classes you need to create a TableLogger
import com.fishlib.configuration.Configuration
import com.illumon.iris.db.util.logging.EventLoggerFactory
import io.deephaven.example.MarketTradeFormat1Logger
import com.illumon.iris.db.tables.dataimport.TableLoggerBase
import java.time.*
import java.util.stream.Collectors;

// It is good practice to use a variable to store the value of the Date clauses for your tables.
REPLAY_DATE=currentDateNy()
REPLAY_DATE_PARTS = REPLAY_DATE.split("-")

// Compute the nanos of Epoch at midnight for the current day
dtMidnightToday = LocalDate.now().atStartOfDay(ZoneId.of("America/New_York")).toInstant().toEpochMilli()*1000000L

// Compute the nanos of Epoch at midnight of the date you are replaying
dtMidnightSource = LocalDate.of(Integer.parseInt(REPLAY_DATE_PARTS[0]),
                                Integer.parseInt(REPLAY_DATE_PARTS[1]),
                                Integer.parseInt(REPLAY_DATE_PARTS[2]))

.atStartOfDay(ZoneId.of("America/New_York")).toInstant().toEpochMilli()*1000000L

// Create a closure that will convert a DBDateTime value from the source date, to the current date, maintaining the timestamp
convertDateToday = { dt -> dt == null ? null : new DBDateTime(dt.getNanos() - dtMidnightSource + dtMidnightToday) }

// Create a closure that will iterate over an input table and automatically apply an updateView clause for each DbDateTime column it finds.
convertDateColumns = { t ->
    final List<String> cols = t.getDefinition().getColumnList().stream().filter({c -> !"Timestamp".equals(c.getName()) && c.getDataType() == DBDateTime.class })
           .map({ c -> c.getName() + " = convertDateToday.call("+c.getName()+")" }).collect(Collectors.toList())
    return t.updateView(cols)
}

// Here we retrieve the table we are going to replay,
// but we add an updateView clause to change the value
// of the Date column to match today's date.
// This way, when the rows are logged they will have the correct date.
// We also apply the convertDateColumns closure so that
// the result table's DbDateTimeColumns reflect the current day.
sourceTable = convertDateColumns(db.i("Market", "Trade").where("Date=`$REPLAY_DATE`")
    .updateView("Date=currentDateNy()"))

// This is just a boilerplate closure that will provide the logger factory below with the partition
cpf = { v -> return currentDateNy() }

// Get an instance of the configuration object to pass down to the logger factory
config = Configuration.getInstance()

// Now create the TableLogger
logger = EventLoggerFactory.createIntradayLogger(config,
    "Replay", // This is the name of the logger, it can be anything
    log, // A Logger object for error reporting
    "MarketSim", // The namespace of the Table you are getting a TableLogger for
    "Trade", // The name of the table you are getting a logger for
    io.deephaven.example.MarketTradeFormat1Logger.class, // The name of the logger class to create
    config.getServerTimezone().getID(), // The timezone of logging
    cpf, false)

// Finally connect the sourceTable to the logger and tell it to write all updates to the log.
logger.logTable(sourceTable, sourceTable.getIndex(), TableLoggerBase.Flags.RowByRow, true, false)

// At this point as rows tick into "Market.Trade" they will be automatically logged back to "MarketSim.Trade"

This final example will replay the table to the MarketSim namespace and also convert every DBDateTime column it finds to the current date at the same time.