Create a replay query

This guide shows you how to create a Replay Query, a Persistent Query (PQ) that replays historical table data as ticking intraday data. Among other things, this is useful for:

  • Testing changes to a script against known data.
  • Troubleshooting issues that a query had on a previous day’s data.
  • Stress testing a query by replaying a large data set at faster speed.
  • Simulating data ticking into a table.

The value of the Deephaven system clock, DateTimeUtils.currentClock, is set to a simulated clock created using the query's replay settings. Calls to today, pastBusinessDate, etc. will return values based on the configured replay time, date, and speed. The timeTable and WindowCheck functions also use simulated clock.

Unless configured otherwise, Replay Queries redirect the db.liveTable command to load the historical version of the table (db.historicalTable) instead. They also apply a special where clause on the table's timestamp column that releases rows based on the configured clock. This allows you to take an existing query that uses intraday data and execute it on historical data instead.

To create a Replay query, click the +New button in the Query Monitor and select the type Live Query Replay (ReplayScript).

Live Query Replay (ReplayScript) selected in the Query Monitor.

Replay Settings

Configure the query in the Replay Settings tab.

The Replay Settings tab.

There are four parameters:

Replay Time

The Replay Time configuration provides two options that control the time of day that the script will begin.

  • Use query start time - The query executes using the current time of day.
  • Fixed start time - The query executes using this value as the current time of day. The format is HH:MM:SS.

This is useful if you need to replay data that was initially streamed in a different time zone than the one you are testing in.

Replay Date

The Replay Date option controls the date that is returned by date methods, such as today() and minusBusinessDays(today(), 1). The default is the last business day of the default calendar.

Replay Speed

The Replay Speed option controls the rate at which data is replayed. This is useful if you want to replay data at a higher speed to simulate a greater data load. A replay speed of 0.5 would replay value at half speed, while a replay speed of 2 would replay data at double speed.

Sorted Replay

The Sorted Replay checkbox guarantees that the data is replayed in Timestamp order - that is, in the order of the table, sorted by the configured timestamp column.

Additional JVM Arguments

Timestamp column

A Replay Query assumes that the replay timestamp column in all tables is Timestamp unless configured otherwise. If your table has a timestamp column that is not named Timestamp, add the following argument to the Extra JVM Arguments field:

-DReplayDatabase.TimestampColumn.<Namespace>.<Table Name>=<Column Name>

For example:

img

Note

If a table has only one column of type Instant, the Replay Query will automatically use it as the replay timestamp.

Replay intraday data

If you want to replay an intraday table instead of a historical table, add the following argument to the Extra JVM Arguments field:

-DReplayDatabase.UseIntraday=true

This is configurable on a per-namespace and per-table basis as well:

-DReplayDatabase.UseIntraday.<Namespace>=true
-DReplayDatabase.UseIntraday.<Namespace>.<Table Name>=true

Script

Replaying historical data

The simplest use case for a Replay Query is to replay a historical data set in an existing query script. If your script already uses the Deephaven built-in date methods, then you will not need to do any additional work to select the appropriate date partition for replay. If not, ensure it selects the correct playback date by hand.

For example, if you have any hardcoded dates:

myTable = db.live_table("Market", "Trade").where("Date=`2020-09-22`")

You will need to change the .where() clause manually. It is generally good practice to store the date in a local variable and reuse it throughout your query, so it is easy to change:

CURRENT_DATE = "2020-09-22"
...
myTable = db.live_table("Market", "Trade").where("Date=CURRENT_DATE")

Save your query and your data will be replayed to the query script.

Simulating intraday data with replay

While it is often useful to simply replay historical data to a script, it may also be helpful to use a historical replay to simulate current intraday data. This example shows how to use the SystemTableLogger to re-log table data and transform timestamps to match today’s date and time. This allows you to run unmodified queries against simulated data with a little bit more complexity and configuration.

Say you want to simulate data from the following table:

<Table namespace="Market" name="Trade" storageType="NestedPartitionedOnDisk">
    <Partitions keyFormula="${autobalance_single}" />
    <Column name="Date" dataType="String" columnType="Partitioning" />
    <Column name="Timestamp" dataType="DateTime" />
    <Column name="symbol" dataType="String" />
    <Column name="price" dataType="Double" />
    <Column name="size" dataType="Integer" />

    <Listener logFormat="1" listenerClass="TradeListener" listenerPackage="io.deephaven.example.gen">
        <Column name="Timestamp" timePrecision="Nanos"/>
        <Column name="symbol" />
        <Column name="price" />
        <Column name="size" />
    </Listener>
</Table>

You could replay data directly back to this table; however, it is recommended that you explicitly separate simulated or replayed data from actual data. First, create a schema for the simulated table to write to. This can be done by creating a CopyTable schema with a new namespace. Here is an example:

<CopyTable namespace="MarketSim" sourceNamespace="Market" name="Trade" sourceName="Trade" />

Deploy the schema using dhconfig schemas. This will add a new system table to Deephaven under the namespace “MarketSim”.

Next, create a Replay Query with a script that pulls the table from the original namespace and logs it and its updates back to a binary log file. For example:

import io.deephaven.enterprise.database.SystemTableLogger
import io.deephaven.time.DateTimeUtils

// It is good practice to use a variable to store the value of the Date clauses for your tables.
REPLAY_DATE = DateTimeUtils.today()
SRC_NAMESPACE = "Market"
DEST_NAMESPACE = "MarketSim"
TABLE_NAME = "Trade"

// Here, we retrieve the table we are going to replay
sourceTable = db.liveTable(SRC_NAMESPACE, TABLE_NAME)
        .where("Date=`$REPLAY_DATE`")
        .dropColumns("Date")

opts = SystemTableLogger.newOptionsBuilder()
        .currentDateColumnPartition(true)
        .build()

// When logging incrementally, a Closeable is returned. You must retain this object to ensure liveness. Call `close()` to stop logging and release resources.
lh = SystemTableLogger.logTableIncremental(db, DEST_NAMESPACE, TABLE_NAME, sourceTable, opts)

// At this point as rows tick into "Market.Trade" they will be automatically logged back to "MarketSim.Trade"
from deephaven_enterprise import system_table_logger as stl
from deephaven import time

# It is good practice to use a variable to store the value of the Date clauses for your tables.

REPLAY_DATE = time.dh_today()
SRC_NAMESPACE = "Market"
DEST_NAMESPACE = "MarketSim"
TABLE_NAME = "Trade"

# Here, we retrieve the table we are going to replay

sourceTable = (
    db.live_table(SRC_NAMESPACE, TABLE_NAME)
    .where("Date=`%s`" % REPLAY_DATE)
    .drop_columns("Date")
)

# When logging incrementally, a Closeable is returned. You must retain this object to ensure liveness. Call `close()` to stop logging and release resources.

lh = stl.log_table_incremental(
    namespace=DEST_NAMESPACE,
    table_name=TABLE_NAME,
    table=sourceTable,
    columnPartition=None,
)

# At this point as rows tick into "Market.Trade" they are automatically logged back to "MarketSim.Trade"

There is one final thing to account for: if your table contains any columns that are Instant columns, you will probably want to convert these into values with the current date, but the same original timestamp. This example replays the table to the MarketSim namespace and also converts every Instant column it finds to the current date at the same time:

import io.deephaven.enterprise.database.SystemTableLogger
import io.deephaven.time.DateTimeUtils

import java.time.Instant
import java.util.stream.Collectors

// It is good practice to use a variable to store the value of the Date clauses for your tables.
REPLAY_DATE = DateTimeUtils.today()
REPLAY_DATE_PARTS = REPLAY_DATE.split("-")

SRC_NAMESPACE = "Market"
DEST_NAMESPACE = "MarketSim"
TABLE_NAME = "Trade"

// Compute the nanos of Epoch at midnight for the current day
dtMidnightToday = LocalDate.now().atStartOfDay(ZoneId.of("America/New_York")).toInstant().toEpochMilli()

// Compute the nanos of Epoch at midnight of the date you are replaying
dtMidnightSource = LocalDate.of(Integer.parseInt(REPLAY_DATE_PARTS[0]),
        Integer.parseInt(REPLAY_DATE_PARTS[1]),
        Integer.parseInt(REPLAY_DATE_PARTS[2]))
        .atStartOfDay(ZoneId.of("America/New_York")).toInstant().toEpochMilli()

// Here we retrieve the table we are going to replay
sourceTable = db.liveTable(SRC_NAMESPACE, TABLE_NAME)
        .where("Date=`$REPLAY_DATE`")
        .dropColumns("Date")

// Create a closure that will convert an Instant value from the source date, to the current date, maintaining the timestamp
convertDateToday = { dt -> dt == null ? null : Instant.ofEpochMilli(dt.toEpochMilli() - dtMidnightSource + dtMidnightToday) }

// convert the Instant columns to today's date
convertDateColumns = { Table t ->
    final String[] cols = t.getDefinition().getColumns().stream().filter({c -> "Timestamp" != c.getName() && c.getDataType() == Instant.class })
            .map({ c -> "${c.getName()} = (Instant) convertDateToday.call(${c.getName()})".toString() }).toArray(String[]::new)
    return t.updateView(cols)
}

sourceTable = convertDateColumns(sourceTable)

opts = SystemTableLogger.newOptionsBuilder()
        .currentDateColumnPartition(true)
        .build()

// When logging incrementally, a Closeable is returned. You must retain this object to ensure liveness. Call `close()` to stop logging and release resources.
lh = SystemTableLogger.logTableIncremental(db, DEST_NAMESPACE, TABLE_NAME, sourceTable, opts)

// At this point, as rows tick into "Market.Trade" they are automatically logged back to "MarketSim.Trade"
from deephaven_enterprise import system_table_logger as stl
from deephaven import time as dh_time
from deephaven.table import Table
from datetime import datetime, time

# It is good practice to use a variable to store the value of the Date clauses for your tables.
REPLAY_DATE = "2024-08-09"
REPLAY_DATE_TIME = datetime.strptime(REPLAY_DATE, "%Y-%m-%d")
# Get the current date
CURRENT_DATE = datetime.now().date()

SRC_NAMESPACE = "Market"
DEST_NAMESPACE = "MarketSim"
TABLE_NAME = "Trade"

# Compute the millis of Epoch at midnight for the current day
midnight_time = int(datetime.combine(CURRENT_DATE, time(0, 0)).timestamp() * 1000)

# Compute the millis of Epoch at midnight of the date you are replaying
midnight_source = int(datetime.combine(REPLAY_DATE_TIME, time(0, 0)).timestamp() * 1000)

# Here we retrieve the table we are going to replay
sourceTable = db.live_table(SRC_NAMESPACE, TABLE_NAME)
    .where("Date=`%s`" % REPLAY_DATE)
    .drop_columns("Date")

# Create a method that will convert a Instant value from the source date, to the current date, maintaining the timestamp
def convertDateToday(dt):
    if dt:
        print(dt.toEpochMilli())
        return dh_time.to_j_instant((dt.toEpochMilli() - midnight_source + midnight_time) * 1000000)
    return None

# convert the Instant columns to today's date
def convertDateColumns(t: Table):
    cols = t.j_table.getDefinition().getColumns()
    updatesList = []
    for i in range(0, cols.size()):
        col = cols.get(i)
        if col.getName() != "Timestamp" and col.getDataType().getName() == 'java.time.Instant':
            updatesList = updatesList + ["%s = (Instant) convertDateToday(%s)" % (col.getName(), col.getName())]
    return t.update_view(updatesList)

sourceTable = convertDateColumns(sourceTable)

# When logging incrementally, a Closeable is returned. You must retain this object to ensure liveness. Call `close()` to stop logging and release resources.
lh = stl.log_table_incremental(namespace=DEST_NAMESPACE, table_name=TABLE_NAME, table=sourceTable, columnPartition=None)

# At this point as rows tick into "Market.Trade" they are automatically logged back to "MarketSim.Trade"