Derived Table Writer
A Derived Table Writer listens to a Deephaven Core Table and writes additions to an in-process Deephaven Data Import Server. Using sequence numbers, or custom logic, the Derived Table Writer can provide exactly once delivery of a derived message.
A Derived Table Writer can be used for several common scenarios:
- Translating a table from one format to another. For example, a system may log raw JSON messages, which are then broken out into fields in a separate stage. The Derived Table Writer can be used to log the more useful parsed JSON messages for downstream use.
- Persisting the results of custom logic -- for example, a price feed or other complex application logic.
- Downsampling aggregations.
The steps to deploy a DerivedTableWwriter are like those of any other in-worker DIS:
- Create a table schema.
- Create a storage directory.
- Configure routing.
- Write an ingestion query. All the logic to generate the table to write and then persist it using a
DerivedTableWriter
logic is encapsulated within the ingestion query.
Note that the input table to a Derived Table Writer must be either an add-only table or a blink table. Added rows are persisted to disk. When using a blink table, removed rows are ignored. Modifications or shifts are not permitted. When operating on tables that have multiple independent streams (like a live table with multiple internal partitions), each stream must be handled independently using a PartitionedTable.
Create a Table Schema
A Derived Table Writer must write to a system table. The simplest way to create the schema is to build it from an in-memory table representing the data you are persisting.
import java.util.ArrayList
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import io.deephaven.engine.table.TableDefinition
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.enterprise.compatibility.TableDefinitionCompatibility
import static io.deephaven.shadow.enterprise.com.illumon.iris.db.tables.TableDefinition.STORAGETYPE_NESTEDPARTITIONEDONDISK
// this assumes you have a table named "tableToWrite"
coreDefinition = tableToWrite.getDefinition()
columnDefinitions = new ArrayList<>(coreDefinition.getColumns())
columnDefinitions.add(0, ColumnDefinition.ofString("Date").withPartitioning())
coreDefinition = TableDefinition.of(columnDefinitions)
edef = TableDefinitionCompatibility.convertToEnterprise(coreDefinition)
edef.setNamespace("MyNamespace")
edef.setName("MyTable")
edef.setStorageType(STORAGETYPE_NESTEDPARTITIONEDONDISK)
schema=io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.xml.SchemaXmlFactory.getXmlSchema(edef, io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.NamespaceSet.SYSTEM)
// write the schema to disk as XML
Files.write(new File("/tmp/MyNamespace.MyTable.schema").toPath(), [io.deephaven.shadow.enterprise.io.deephaven.dhservice.SchemaFormatUtil.asString(schema)], StandardOpenOption.CREATE_NEW)
After writing the schema to disk, it can be imported with dhconfig
:
sudo /usr/illumon/latest/bin/dhconfig schema import /tmp/MyNamespace.MyTable.schema
Alternatively, the schema
object can be directly added to the schema service from the Groovy session:
import io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.SchemaServiceFactory;
SchemaServiceFactory.getDefault().addSchema(schema)
Create a Storage Directory
The intraday data is stored under a directory that must be exclusively owned by the DIS. Deephaven recommends the following location for the storage directory:
/db/dataImportServers/[DIS_Name]/
The storage directory must exist before the DIS Persistent Query can be started. When the script is run as a Persistent Query on a merge server, the dbmerge
account requires read and write privileges.
Configure Routing
The DIS must be added to the data routing configuration. See Add a Data Import Server for instructions.
The following example adds a DIS named "json_parser" with a claim for the ParsedLog
table in the Derived
namespace. The DIS uses dynamic endpoints, so port values are not supplied. Private storage is used by default, so the script must specify the directory for data storage.
sudo -u irisadmin /usr/illumon/latest/bin/dhconfig dis add --name json_parser --claim Derived.ParsedLog
Write an Ingestion Query
After configuring a schema, storage, and routing, the next step is to create an in-worker DIS for the desired table. The following script creates a DIS with the name "json_parser". The routing YAML file uses "private" storage, so the second parameter to the getDisByNameWithStorage
method specifies a pathname to a directory for the exclusive use of this in-worker DIS. The DIS automatically starts and listens on the Table Data and Tailer ports configured in routing. The above example uses a dynamic port registered with the service registry.
import io.deephaven.engine.context.ExecutionContext
import io.deephaven.enterprise.dataimportserver.DataImportServerTools
import com.fasterxml.jackson.databind.ObjectMapper;
// A simple class with appropriate Jackson annotations to serialize and deserialize JSON messages. The full
// example class can be expanded below.
import io.deephaven.enterprise.example.derivedjson.JSONPojo;
// Allows parsing of Java Instant objects
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
// Add the JSONPojo class to the query library so that it can be used in an update statement later on
ExecutionContext.getContext().getQueryLibrary().importClass(JSONPojo.class)
// We must register the JavaTimeModule to serialize or deserialize Instants
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule())
// create the Derived DIS; we create this early in the script so that we can reference the tables that it serves
dis = DataImportServerTools.getDisByNameWithStorage("json_parser", "/db/dataImportServers/json_parser")
// Read in the original table containing raw JSON strings as a blink table. We use a blink table to limit the number of rows in-memory for any single update.
jsonTable=db.liveTable("Source", "TestJSON", io.deephaven.enterprise.database.TableOptions.newLiveBuilder().isBlink(true).build())
// Parse the JSON using Jackson. The readValue call is inside of an update, which is evaluated one time per row. The individual
// columns are extracted from the JSONPojo object with updateView. The DerivedTableWriter materializes the viewed columns on demand.
jsonParsed = jsonTable.update("Pojo=(JSONPojo)objectMapper.readValue(JSON, JSONPojo.class)").view("Sequence", "Timestamp=Pojo.Timestamp", "StrVal=Pojo.StrValue", "DVal=Pojo.DVal")
dtw=DerivedTableWriter.ingestTable("Derived", "ParsedLog", dis, jsonParsed, new DerivedTableWriter.Options().sequenceColumn("Sequence"))
// And load the result table.
tb=db.liveTable("Derived", "ParsedLog").coalesce()
Example JSONPojo class
package io.deephaven.enterprise.example.derivedjson;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
/**
* This class provides JSON serialization and deserialization for a simple example type
* to assist in the demonstration of the DerivedTableWriter.
*/
public class JSONPojo {
public JSONPojo() {}
public JSONPojo(@JsonProperty("sequence") long Sequence,
@JsonProperty("timestamp") Instant timestamp,
@JsonProperty("strValue") String StrValue,
@JsonProperty("dval") double DVal) {
this.Sequence = Sequence;
this.Timestamp = timestamp;
this.StrValue = StrValue;
this.DVal = DVal;
}
@JsonProperty("sequence")
public long Sequence;
@JsonProperty("timestamp")
public Instant Timestamp;
@JsonProperty("strValue")
public String StrValue;
@JsonProperty("dval")
public double DVal;
}
Handling Multiple Partitions
In the prior example, the ingestTable
call is only suitable for a single partition because the sequence numbers must be ascending. To handle multiple partitions, you must use a PartitionedTable. To retrieve tables from a Core+ Database object as a partitioned table, use db.livePartitionedTable.
The ingestPartitionedTable function takes a PartitionedTable as input and independently logs each constituent. The ingestion for each partition is controlled by the same Options structure as the ingestTable method, and the PartitionedOptions structure determines how column and internal partitions are mapped from the source table.
If the source partitioned table (not the constituent tables — the partitioned table itself) has a key column with the name InternalPartition
(this is the constant value PARTITIONED_TABLE_INTERNAL_PARTITION_COLUMN), then internal partition is automatically copied from the InternalPartition
column. Assuming the PartitionedTable has only two key columns, then the other key column is automatically copied from the remaining key column. Alternatively, the partition names can be specified as a column in the partitioned table or set to a String value.
The above example remains the same until the point that the DIS is created. Adapting the remainder of the example to use Partitioned Tables yields:
// Load the JSON data, with one constituent per partition.
jsonPartitionedTable=db.livePartitionedTable("Source", "TestJSON", io.deephaven.enterprise.database.TableOptions.newLiveBuilder().isBlink(true).build())
// Parse the JSON using Jackson, and then use view to extract individual fields from the object.
jsonParsed = jsonPartitionedTable.proxy().update("Pojo=(JSONPojo)objectMapper.readValue(JSON, JSONPojo.class)").view("Sequence", "Timestamp=Pojo.Timestamp", "StrVal=Pojo.StrValue", "DVal=Pojo.DVal").target()
pdtw=DerivedTableWriter.ingestPartitionedTable("Derived", "ParsedLog", dis, jsonParsed, new DerivedTableWriter.PartitionedOptions(), new DerivedTableWriter.Options().sequenceColumn("Sequence"))
// And load the result table.
tb=db.liveTable("Derived", "ParsedLog").coalesce()
Sequence Numbers and Resumption
The Options structure can specify a Sequence column using the sequenceColumn method, in which case the DerivedTableWriter automatically reads sequence values from the table and stores them in the output partition's checkpoint record. The sequence column must contain strictly ascending long values, but gaps are permitted.
An alternative to using built-in sequence number support is to read the existing table, and based on the table's contents, create your source table using that data. For example, if your table is derived from a ticking feed, you might load the table from the disk and use the last sequence number to initialize the feed connection using that sequence number. The following example is synthetically generating random data, including sequence numbers that begin where the table previously left off:
// create the Derived DIS; we create this early in the script so that we can reference the tables that it serves
dis=io.deephaven.enterprise.dataimportserver.DataImportServerTools.getDisByNameWithStorage("Derived", "/db/Intraday/dataImportServers/Derived")
// Figure out the last ingested sequence number so we can resume from that point
seqIt = db.liveTable("Derived", "TestA").where("Date=today()").tail(1).longColumnIterator("Sequence")
startSequence=seqIt.hasNext() ? (seqIt.nextLong() + 1) : 0
seqIt.close()
// The startSequence that we've read from the table and are now using to generate the sequence numbers in update
// can be considered analagous to a sequence number that we pass to a feed API.
println("Starting sequence: " + startSequence)
tableToWrite=timeTable("PT0.1s").update("Sequence=ii + startSequence", "DVal=random.nextDouble()")
// Ingest the generated data into "Derived.TestA", with a column partition of "today()" and an internal partition of "T1"
c = DerivedTableWriter.ingestTable("Derived", "TestA", today(), "T1", dis, tableToWrite, new DerivedTableWriter.Options())
Retrieve the table from another query
The ingested data is retrieved as with any other table in both Legacy and Core+ workers.
trades = db.liveTable("Order", "ParsedLog").where("Date=today()")
trades = db.live_table("Order", "ParsedLog").where(["Date=today()"])
Downsampling Aggregations
Downsampling, or summarizing an original table, is a common derivation. This section provides an example of downsampling a trade feed into one-minute time bins. For each time interval, we record the number of trades, shares, and dollar volume by exchange and symbol.
At the heart of the downsampling process is a simple aggregation:
// Bucket trades into 1-minute intervals for each Symbol and Exchange. Aggregate the total Dollar Volume, Shares, and trade Count.
trades_by_minute=trades_raw.updateView("Bin=upperBin(ExchangeTimestamp, 'PT1m')", "Dollars=Size*Price")
.aggBy([AggCount("Count"),
AggSum("Volume=Size", "Dollars"),
AggMin("FirstTrade=ExchangeTimestamp"),
AggMax("LastTrade=ExchangeTimestamp", "LastReceived=Timestamp")],
"Sym", "Bin", "Exchange")
For a static table, the aggregation would be sufficient. However, for a ticking table, it is necessary to wait until an interval is complete before writing it to the downsampled table. Moreover, our example data contains two timestamps. The "Timestamp" column is the time the trade was received, and the "ExchangeTimestamp" is the timestamp of the trade provided by the exchange. The received time is ascending, but while the ExchangeTimestamp is usually ascending, there are cases where it can "go back in time" (e.g., to report a late trade).
As each micro-batch of trades is processed (according to the PeriodicUpdateGraph's cycle time), the corresponding bin updates. However, if we were to log trades_by_minute
on each update cycle, then we would defeat the purpose of our downsampling process by producing many rows for each bucket, instead of summarizing the table into one row for each bucket.
To delay logging a bin until it has been completed, we avoid logging a bucket until (1) the next bucket has started or (2) sufficient time has passed. To determine the last bucket, we use the AggSortedLast
aggregation. Only the most recent bucket for each symbol and exchange exists in the "last_bucket" table. To prevent that bucket from becoming stale (e.g., when trades stop at the end of the day), we add a window check operation that only includes buckets within the last 90 seconds.
// Determine what the last bucket for each Symbol/Exchange combination is; we do not include this bucket in the output
last_bucket = trades_by_minute.aggBy([AggSortedLast("Bin", "Bin")], "Sym", "Exchange")
// Unless the bucket is more than 90 seconds old, in which case we expire the bucket and allow it to be written
unexpired_last_bucket = WindowCheck.addTimeWindow(last_bucket, "Bin", 90_000_000_000L, "InWindow").where("InWindow")
We then use a whereIn
operation to exclude the most recent unexpired bucket from the result table.
This prevents the downsampled table from ticking as each new trade is processed:
// The complete buckets are those that either are not the most recent for a given symbol/exchange, or are expired (i.e. outside the 90s window)
complete_trades_by_minute = trades_by_minute.whereNotIn(unexpired_last_bucket, "Bin", "Sym", "Exchange")
The result from complete_trades_by_minute
is our downsampled table. We do, however, have one final wrinkle to address before writing the table using a Derived Table Writer. The Derived Table Writer can log an add-only table or a blink table, but the result of an aggregation can include arbitrary modifications. In the case of our downsampler, this occurs when a trade is reported late (i.e., after the trade's bin has expired or trades from a more recent bin have been processed).
To handle this case, we separate the complete_trades_by_minute
table into a blink table of additions (downsampled_trades
) and a second table of late arrivals (late_arrival_trades
). To perform this separation, we use a custom SeparateAdditions
class stored in a Web Notebook for convenience. You can equivalently store Groovy classes in a Git repository accessible by the controller or as a custom Java library.
separated=SeparateAdditions.separateAdditions(complete_trades_by_minute)
downsampled_trades = separated.getLeft()
//We keep an append-only table around for viewing in the UI; the DerivedTableWriter processes both blink and append-only tables.
downsampled_late_arrivals=separated.getRight().dropColumns("ExceptionType")
SeparateAdditions Groovy Source Code
package io.deephaven.enterprise.example.separateadditions;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.ListenerRecorder;
import io.deephaven.engine.table.impl.MergedListener;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.ObjectArraySource;
import io.deephaven.util.SafeCloseable;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.util.SafeCloseableArray;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.function.Supplier;
/**
* <p>
* Divide the updates of a table into two blink tables. The first contains additions; the second contains
* modifications and removals. Shifts are ignored.
* </p>
*
* <p>
* When aggregating by a time bucket, typically only the most recent bucket is modified. However, late arrivals can
* result in a prior row being updated. By dividing the additions from the modifications, we can convert our aggregation
* result into a blink table for the new rows and a separate blink table with a different code path for the
* modifications. When used with the DerivedTableWriter as part of a downsampling workflow, we can log the downsampled
* rows as they first appear while still keeping track of the late arrivals separately.
* </p>
*/
public class SeparateAdditions {
public static final int CHUNK_SIZE = 4096;
public static String MODIFIED = "Modified";
public static String REMOVED = "Removed";
/**
* Divide source's updates into a blink table of additions and exceptions (removals/modifications).
*
* @param source the table with updates to divide
* @return a pair of blink tables (additions on the left, modifications/removals on the right)
*/
public static Pair<Table, Table> separateAdditions(Table source) {
final UpdateGraph updateGraph = source.getUpdateGraph();
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return QueryPerformanceRecorder.withNugget("separateAdditions()",
source.size(),
makeInitSupplier(source));
}
}
static Supplier<Pair<Table, Table>> makeInitSupplier(Table source) {
// noinspection Convert2Lambda,Convert2Diamond (Groovy made me do this)
return new Supplier<Pair<Table, Table>>() {
@Override
public Pair<Table, Table> get() {
return init(source);
}
};
}
private static Pair<Table, Table> init(Table source) {
// noinspection resource
final TrackingWritableRowSet rowset = source.getRowSet().copy().toTracking();
final Map<String, Object> attributes = Collections.singletonMap(Table.BLINK_TABLE_ATTRIBUTE, Boolean.TRUE);
final Map<String, ? extends ColumnSource<?>> columnSourceMap = source.getColumnSourceMap();
final QueryTable resultTable =
new QueryTable(source.getDefinition(), rowset, new LinkedHashMap<>(columnSourceMap), null, attributes);
// the table of "bad" rows that have been modified after the fact
final LinkedHashMap<String, WritableColumnSource<?>> exceptionSourceMap = makeExceptionMap(columnSourceMap);
final TrackingWritableRowSet exceptionRows = RowSetFactory.empty().toTracking();
final QueryTable exceptionTable = new QueryTable(exceptionRows, exceptionSourceMap);
exceptionTable.setAttribute(Table.BLINK_TABLE_ATTRIBUTE, Boolean.TRUE);
final ListenerRecorder recorder = new ListenerRecorder("separateAdditions", source, resultTable);
source.addUpdateListener(recorder);
final MergedListener listener = new SeparateAdditionsListener(source, resultTable, exceptionTable, recorder);
exceptionTable.addParentReference(listener);
// we need to clear out our blink tables on each cycle
final Runnable notifyListener = new Runnable() {
@Override
public void run() {
listener.notifyChanges();
}
};
source.getUpdateGraph().addSource(notifyListener);
resultTable.addParentReference(notifyListener);
exceptionTable.addParentReference(notifyListener);
return Pair.of(resultTable, exceptionTable);
}
private static void ensureCapacity(LinkedHashMap<String, WritableColumnSource<?>> exceptionSources, long capacity) {
exceptionSources.values().forEach(cs -> cs.ensureCapacity(capacity));
}
private static LinkedHashMap<String, WritableColumnSource<?>> makeExceptionMap(
Map<String, ? extends ColumnSource<?>> columnSourceMap) {
final LinkedHashMap<String, WritableColumnSource<?>> exceptionSources = new LinkedHashMap<>();
final ObjectArraySource<String> exceptionType = new ObjectArraySource<>(String.class);
exceptionSources.put("ExceptionType", exceptionType);
columnSourceMap.forEach((k, v) -> exceptionSources.put(k,
ArrayBackedColumnSource.getMemoryColumnSource(0, v.getType(), v.getComponentType())));
return exceptionSources;
}
private static class SeparateAdditionsListener extends MergedListener {
private final ListenerRecorder sourceRecorder;
private final QueryTable resultTable;
private final LinkedHashMap<String, WritableColumnSource<?>> exceptionSourceMap;
private final ColumnSource[] sourceColumns;
private final WritableColumnSource[] exceptionSources;
private final WritableColumnSource[] toClear;
private final WritableColumnSource<String> exceptionTypeSource;
private final TrackingWritableRowSet exceptionRows;
private final QueryTable exceptionTable;
private final TrackingWritableRowSet rowset;
public SeparateAdditionsListener(Table source,
QueryTable resultTable,
QueryTable exceptionTable,
ListenerRecorder recorder) {
super(Collections.singleton(recorder), Collections.emptyList(), "separateAdditions", resultTable);
this.sourceRecorder = recorder;
sourceRecorder.setMergedListener(this);
final Map<String, ? extends ColumnSource<?>> columnSourceMap = source.getColumnSourceMap();
final LinkedHashMap<String, WritableColumnSource<?>> exceptionSourceMap =
(LinkedHashMap) exceptionTable.getColumnSourceMap();
sourceColumns = new ColumnSource[columnSourceMap.size()];
exceptionSources = new WritableColumnSource[columnSourceMap.size()];
int ci = 0;
for (final Iterator<Map.Entry<String, ? extends ColumnSource>> it =
(Iterator) columnSourceMap.entrySet().iterator(); it.hasNext();) {
Map.Entry<String, ? extends ColumnSource> next = it.next();
sourceColumns[ci] = next.getValue();
exceptionSources[ci] = exceptionSourceMap.get(next.getKey());
exceptionSources[ci].startTrackingPrevValues();
ci++;
}
toClear = Arrays.stream(exceptionSources).filter(cs -> cs instanceof ObjectArraySource)
.toArray(WritableColumnSource[]::new);
exceptionTypeSource = (WritableColumnSource<String>) exceptionSourceMap.get("ExceptionType");
exceptionTypeSource.startTrackingPrevValues();
this.resultTable = resultTable;
this.exceptionSourceMap = exceptionSourceMap;
this.exceptionRows = exceptionTable.getRowSet().writableCast();
this.rowset = resultTable.getRowSet().writableCast();
this.exceptionTable = exceptionTable;
}
public void process() {
long exceptionSize = 0;
if (sourceRecorder.recordedVariablesAreValid()) {
final TableUpdate upstream = sourceRecorder.getUpdate();
final TableUpdateImpl downstream = new TableUpdateImpl();
ensureCapacity(exceptionSourceMap, upstream.modified().size() + upstream.removed().size());
if (upstream.modified().isNonempty()) {
exceptionSize = logExceptions(upstream.modified(), exceptionSize, MODIFIED, false);
}
if (upstream.removed().isNonempty()) {
exceptionSize += logExceptions(upstream.modified(), exceptionSize, REMOVED, true);
}
downstream.added = upstream.added().copy();
downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY;
downstream.modified = RowSetFactory.empty();
downstream.shifted = RowSetShiftData.EMPTY;
downstream.removed = rowset.copy();
rowset.resetTo(upstream.added());
resultTable.notifyListeners(downstream);
} else if (rowset.isNonempty()) {
final TableUpdateImpl downstream = new TableUpdateImpl();
downstream.added = RowSetFactory.empty();
downstream.modified = RowSetFactory.empty();
downstream.shifted = RowSetShiftData.EMPTY;
downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY;
downstream.removed = rowset.copy();
rowset.clear();
resultTable.notifyListeners(downstream);
}
if (exceptionSize > 0 || exceptionRows.isNonempty()) {
final TableUpdateImpl exceptionDownstream = new TableUpdateImpl();
exceptionDownstream.added = RowSetFactory.flat(exceptionSize);
exceptionDownstream.removed = RowSetFactory.flat(exceptionRows.size());
exceptionDownstream.modified = RowSetFactory.empty();
exceptionDownstream.shifted = RowSetShiftData.EMPTY;
exceptionDownstream.modifiedColumnSet = ModifiedColumnSet.EMPTY;
if (exceptionSize < exceptionRows.size() && toClear.length > 0) {
final WritableRowSet rowsToClear = RowSetFactory.fromRange(exceptionSize + 1, exceptionRows.size() - 1);
for (final WritableColumnSource<?> writableColumnSource : toClear) {
writableColumnSource.setNull(rowsToClear);
}
}
exceptionRows.resetTo(RowSetFactory.flat(exceptionSize));
exceptionTable.notifyListeners(exceptionDownstream);
}
}
private long logExceptions(RowSet rowsToLog,
long exceptionSize,
String exceptionType,
final boolean previous) {
final int chunkSize = (int) Math.min(rowsToLog.size(), CHUNK_SIZE);
final ChunkSource.GetContext[] getContexts = new ChunkSource.GetContext[sourceColumns.length];
final ChunkSink.FillFromContext[] ffc = new ChunkSink.FillFromContext[sourceColumns.length];
try (final RowSequence.Iterator mit = rowsToLog.getRowSequenceIterator();
final SafeCloseableArray<ChunkSource.GetContext> ignored = new SafeCloseableArray<>(getContexts);
final SafeCloseableArray<ChunkSink.FillFromContext> ignored2 = new SafeCloseableArray<>(ffc);
final SharedContext sharedContext = SharedContext.makeSharedContext();
final WritableObjectChunk<String, Values> exceptionTypeChunk =
WritableObjectChunk.makeWritableChunk(chunkSize);
final ChunkSink.FillFromContext tffc = exceptionTypeSource.makeFillFromContext(chunkSize)) {
for (int cc = 0; cc < sourceColumns.length; ++cc) {
getContexts[cc] = sourceColumns[cc].makeGetContext(chunkSize, sharedContext);
ffc[cc] = exceptionSources[cc].makeFillFromContext(chunkSize);
}
while (mit.hasMore()) {
sharedContext.reset();
final RowSequence mitChunk = mit.getNextRowSequenceWithLength(chunkSize);
final RowSequence chunkToWrite =
RowSequenceFactory.forRange(exceptionSize, exceptionSize + mitChunk.size() - 1);
exceptionTypeChunk.setSize(mitChunk.intSize());
exceptionTypeChunk.fillWithValue(0, mitChunk.intSize(), exceptionType);
exceptionTypeSource.fillFromChunk(tffc, exceptionTypeChunk, chunkToWrite);
for (int cc = 0; cc < sourceColumns.length; ++cc) {
final Chunk<?> ck = previous ? sourceColumns[cc].getPrevChunk(getContexts[cc], mitChunk)
: sourceColumns[cc].getChunk(getContexts[cc], mitChunk);
exceptionSources[cc].fillFromChunk(ffc[cc], ck, chunkToWrite);
}
exceptionSize += mitChunk.size();
}
}
return exceptionSize;
}
}
}
After having created the two separate streams, which are suitable for logging, we can pass them to a DerivedTableWriter
:
// We combine the downsampled trades and late arrivals to produce our actual output
downsampled_to_log=merge(downsampled_trades, downsampled_late_arrivals).update("Timestamp=Instant.now()")
The last remaining concern is resuming the query after restart. After restarting, we can read back the table to determine which bins we have already logged:
// Find out the bucket's we've logged for each exchange/symbol
already_logged=db.liveTable("Derived", "DownsampledTrades", TableOptions.forStatic()).where("Date=today()").lastBy("Exchange", "Sym", "Bin")
Using the already_logged
table, we can prevent those already logged bins from being included in the downsampled table unless they have been changed while the query was down (i.e., a late arrival occurred). To filter out unchanged bins, we only include bins that have a changed "Count" column (which indicates that more trades have arrived for the given bin).
// Log bins that are new (no prior count) or have a changed count
downsampled_trades = downsampled_trades
.naturalJoin(already_logged, "Exchange,Sym,Bin", "PriorCount=Count")
.where("isNull(PriorCount) || Count != PriorCount").dropColumns("PriorCount")
Because a particular bin may appear in the output more than once, downstream consumers should apply a lastBy
operation to the downsampled table:
// Downstream queries should load the Derived table, and execute a lastBy() to account for updated bins
downsampledView = db.liveTable("Derived", "DownsampledTable").where("Date=today()").lastBy("Bin", "Exchange", "Sym").sort("Bin")
# Downstream queries should load the Derived table, and execute a last_by() to account for updated bins
downsampled_view = (
db.live_table("Derived", "DownsampledTable")
.where("Date=today()")
.last_by("Bin", "Exchange", "Sym")
.sort("Bin")
)
Note, consumers can also request a live-updating downsampled table without the delays from restart by subscribing to the trades_by_minute
table:
import io.deephaven.uri.ResolveTools
downsampled = ResolveTools.resolve("pq://Downsample%20Replay/scope/trades_by_minute")
from deephaven_enterprise import uri
downsampled = uri.resolve("pq://Downsample%20Replay/scope/trades_by_minute")
The downsampled tables from this live example can be directly merged to produce historical tables; however, you may prefer to run a downsampling job against either the live or historical data after the data is complete, to reduce complications related to late arrivals and restart (e.g., multiple rows per output bucket).
Complete Downsampling Query Example
For reference, the source code of the entire example query is as follows. You can adapt this query to your particular use case and schema.
import java.util.Collections;
import io.deephaven.engine.table.impl.BlinkTableTools
import io.deephaven.engine.util.WindowCheck;
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter
import io.deephaven.enterprise.database.TableOptions;
import io.deephaven.enterprise.example.separateadditions.SeparateAdditions;
// Since we are a replay query, add a simple clock to the dashboard watch things tick along
current_time = timeTable("PT1s").tail(1)
// Start the DIS, so we can pick up from where we left off
dis=io.deephaven.enterprise.dataimportserver.DataImportServerTools.getDisByNameWithStorage("Derived", "/db/Intraday/dataImportServers/Derived")
// Find out the bucket's we've logged for each exchange/symbol,
already_logged=db.liveTable("Derived", "DownsampledTrades", TableOptions.forStatic()).where("Date=today()").lastBy("Exchange", "Sym", "Bin")
// Get the source data for today
trades_raw=db.liveTable("MarketUs", "TradeNbboStock").where("Date=today()").coalesce()
// Bucket trades into 1-minute intervals for each Symbol and Exchange. Aggregate the total Dollar Volume, Shares, and trade Count.
trades_by_minute=trades_raw.updateView("Bin=upperBin(ExchangeTimestamp, 'PT1m')", "Dollars=Size*Price")
.aggBy([AggCount("Count"),
AggSum("Volume=Size", "Dollars"),
AggMin("FirstTrade=ExchangeTimestamp"),
AggMax("LastTrade=ExchangeTimestamp", "LastReceived=Timestamp")],
"Sym", "Bin", "Exchange")
.updateView("Count=(int)Count")
// Determine what the last bucket for each Symbol/Exchange combination is; we do not include this bucket in the output
last_bucket = trades_by_minute.aggBy([AggSortedLast("Bin", "Bin")], "Sym", "Exchange")
// Unless the bucket is more than 90 seconds old, in which case we expire the bucket and allow it to be written
active_buckets = WindowCheck.addTimeWindow(last_bucket, "Bin", 90_000_000_000L, "InWindow").where("InWindow")
// The complete buckets are those that are not the most recent for a given symbol/exchange or expired
complete_trades_by_minute = trades_by_minute.whereNotIn(active_buckets, "Bin", "Sym", "Exchange")
// We want to use the complete buckets as our downsample output, but we separate it into the downsampled table
// and a table of late arrivals. We want to log the late arrivals separately, so that the main table is
// add-only with a single value. We can combine the late additions separately.
separated=SeparateAdditions.separateAdditions(complete_trades_by_minute)
// we use a blink table for the additions, which is suitable for use with the DerivedTableWriter
downsampled_trades = separated.getLeft()
// Log bins that are new (no prior count) or have a changed count
downsampled_trades = downsampled_trades
.naturalJoin(already_logged, "Exchange,Sym,Bin", "PriorCount=Count")
.where("isNull(PriorCount) || Count != PriorCount").dropColumns("PriorCount")
// Buckets that have been modified are late arrivals
downsampled_late_arrivals=separated.getRight().dropColumns("ExceptionType")
// We combine the downsampled trades and late arrivals to produce our actual output
downsampled_to_log=merge(downsampled_trades, downsampled_late_arrivals).update("Timestamp=Instant.now()")
// We keep an append-only table (in-memory) around for viewing in the UI.
downsampled_late_arrivals=BlinkTableTools.blinkToAppendOnly(downsampled_late_arrivals)
// Connect our input tables to the DerivedTableWriters. We use a single "downsample" internal partition for the main table
// For the late arrival table, we use a "late" partition and a "late_on_restart" partition
c = DerivedTableWriter.ingestTable("Derived", "DownsampledTrades", today(), "downsample", dis, downsampled_to_log, new DerivedTableWriter.Options())
// Downstream queries should load the Derived table, and execute a lastBy() to account for updated bins
downsampledView = db.liveTable("Derived", "DownsampledTrades").where("Date=today()").lastBy("Bin", "Exchange", "Sym").sort("Bin")
Using as-of join instead of aggregation
In the previous trade downsampling example, we only produce results for minutes with a trade. For downsampling quotes, one strategy would be to use a lastBy
operation to get the most recent quote in each time period. For example:
quotes_by_minute=quotes_raw.updateView("Bin=upperBin(ExchangeTimestamp, 'PT1m')").lastBy("Bin")
However, some analysis is simplified by having a Bin for every minute of the trading day - even if no quote updates occurred. This kind of result can be generated using the aj
operation. First, we must create a list of times of interest, which can be done with timeTable:
all_bins=timeTable(DateTimeUtils.parseInstant(today() + "T09:30:00 ET"), "PT1m").renameColumns("Bin=Timestamp")
After generating all of the time bins, we need to combine that with all of the possible symbols (assuming the data comes from the MarketUs.QuoteNbboStock
table). We can generate a complete list of symbols as follows:
raw_quotes=db.liveTable("MarketUs", "QuoteNbboStock").where("Date=today()").coalesce()
yesterdays_syms=db.historicalTable("MarketUs", "QuoteNbboStock").where("Date=pastBusinessDate(1).toString()").selectDistinct("Sym")
symbols = merge(yesterdays_syms, raw_quotes.selectDistinct("Sym")).selectDistinct("Sym")
And then cross join it to the allbuckets
time table:
all_buckets=all_bins.join(symbols)
After you have all of the buckets, you can then use aj
to find the quote immediately prior to the bucket's timestamp:
quotes_by_minute=all_buckets.aj(raw_quotes.renameColumns("QuoteTimestamp=Timestamp"), "Sym,Bin>ExchangeTimestamp")
The remainder of the query is very similar to the trades downsampling query. We use SeparateAdditions
to produce blink tables of rows to log, and filter out previously logged buckets.
Quote downsampling with aj
import java.util.Collections;
import io.deephaven.engine.table.impl.BlinkTableTools
import io.deephaven.engine.util.WindowCheck;
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter
import io.deephaven.enterprise.database.TableOptions;
import io.deephaven.enterprise.example.separateadditions.SeparateAdditions;
current_time = timeTable("PT1s").tail(1)
dis=io.deephaven.enterprise.dataimportserver.DataImportServerTools.getDisByNameWithStorage("Quotes", "/db/Intraday/dataImportServers/Quotes")
previously_logged=db.liveTable("Derived", "DownsampledQuotes", TableOptions.forStatic()).where("Date=today()").selectDistinct("Sym", "Bin")
raw_quotes=db.liveTable("MarketUs", "QuoteNbboStock").where("Date=today()").coalesce()
yesterdays_syms=db.historicalTable("MarketUs", "QuoteNbboStock").where("Date=pastBusinessDate(1).toString()").selectDistinct("Sym")
symbols = merge(yesterdays_syms, raw_quotes.selectDistinct("Sym")).selectDistinct("Sym")
all_bins=timeTable(DateTimeUtils.parseInstant(today() + "T09:30:00 ET"), "PT1m").renameColumns("Bin=Timestamp")
all_buckets=all_bins.join(symbols)
quotes_by_minute=all_buckets.aj(raw_quotes.renameColumns("QuoteTimestamp=Timestamp"), "Sym,Bin>ExchangeTimestamp")
// We want to use the complete buckets as our downsample output, but we separate it into the downsampled table
// and a table of late arrivals. We want to log the late arrivals separately, so that the main table is
// add-only with a single value. We can combine the late additions separately.
separated=SeparateAdditions.separateAdditions(quotes_by_minute)
// we use a blink table for the additions, which is suitable for use with the DerivedTableWriter
downsampled_quotes = separated.getLeft().whereNotIn(previously_logged, "Sym", "Bin")
// Buckets that have been modified are late arrivals
downsampled_late_arrivals=separated.getRight().dropColumns("ExceptionType")
// We combine the downsampled trades and late arrivals to produce our actual output
downsampled_to_log=merge(downsampled_quotes, downsampled_late_arrivals).dropColumns("Date").update("Timestamp=Instant.now()")
// We keep an append-only table (in-memory) around for viewing in the UI.
downsampled_late_arrivals=BlinkTableTools.blinkToAppendOnly(downsampled_late_arrivals)
// Connect our input tables to the DerivedTableWriters. We use a single "downsample" internal partition for the main table
// For the late arrival table, we use a "late" partition and a "late_on_restart" partition
c = DerivedTableWriter.ingestTable("Derived", "DownsampledQuotes", today(), "downsample", dis, downsampled_to_log, new DerivedTableWriter.Options())
downsampled_result = db.liveTable("Derived", "DownsampledQuotes").where("Date=today()").lastBy("Sym", "Bin")```