Derived Table Writer

A Derived Table Writer (DTW) enables real-time data transformation and persistence in Deephaven Enterprise. It listens to live tables, applies transformations, and writes the results to persistent storage with exactly-once delivery guarantees.

When to use a Derived Table Writer

Use a Derived Table Writer when you need to:

  • Transform and persist streaming data in real-time with exactly-once guarantees.
  • Resume processing after system restarts without data loss or duplication.
  • Preserve computed results from complex analytics or custom logic.
  • Downsample high-frequency data into time-based aggregations.
  • Ensure exactly-once delivery of processed data to downstream systems.

This guide demonstrates a complete end-to-end example using market data processing to show you how to:

  1. Understand core concepts and partitioning
  2. Set up the necessary infrastructure components
  3. Transform and persist real-time data
  4. Access and query the results

Understanding partitioning in Deephaven

Before working with DTW, it's essential to understand how Deephaven organizes data using a multi-level partitioning system. This organization improves query performance, enables efficient data storage, and facilitates data lifecycle management.

Date partitioning

The most important partition in Deephaven is the Date partition. Every table written with DTW requires a special Date column which serves as the primary partitioning key.

  • The Date column physically separates data into different directories on disk.
  • Queries that filter on the Date column are much faster as they only scan relevant partitions.
  • Data lifecycle management uses Date partitioning for retention policies.

When using DTW, you specify the Date partition value with the columnPartition parameter, which is typically set to today() for current data:

DerivedTableWriter.ingestTable(
    "Market",             // namespace
    "MarketData",         // table name
    today(),              // columnPartition - specifies the Date partition value
    "live",              // internalPartition - explained below
    dis,                  // Data Import Server
    sourceTable,          // table to ingest
    options               // configuration options
)

Internal partitioning

Within each Date partition, Deephaven further organizes data using internal partitions, specified by the internalPartition parameter.

  • Internal partitions group related data together for better organization.
  • They facilitate parallel processing across different data streams.
  • They enable separate handling of different data categories or sources.

For example, you might use different internal partitions for:

  • Different data feeds ("nyse", "nasdaq")
  • Different processing stages ("raw", "filtered", "aggregated")
  • Different use cases ("live", "backtest", "research")

Internal partitions are specified as strings:

DTW.ingestTable("Market", "Trades", today(), "nyse", dis, nyseSourceTable, options)
DTW.ingestTable("Market", "Trades", today(), "nasdaq", dis, nasdaqSourceTable, options)

Partitioned tables

For high-volume data processing across multiple streams (e.g., processing many stock symbols in parallel), Deephaven offers partitioned tables that leverage both date and key-based partitioning:

// Process data partitioned by Symbol
DerivedTableWriter.ingestPartitionedTable(
    "Market",                           // namespace
    "SymbolData",                      // table name
    dis,                               // Data Import Server
    partitionedSourceTable,            // source partitioned table
    new DerivedTableWriter.PartitionedOptions(),
    new DerivedTableWriter.Options().sequenceColumn("Sequence")
)

Partitioned tables maintain separate sequence tracking for each partition, allowing for independent, parallel processing.

Core concepts

Input table requirements

Derived Table Writer accepts only specific table types:

  • Add-only tables: Tables where rows are only added, never modified or removed.
  • Blink tables: Tables that show only the most recent changes, with automatic cleanup.
// Add-only table example
addOnlyTable = timeTable("PT1s").update("Value=ii")

// Blink table example
blinkTable = db.liveTable("Source", "Data",
    io.deephaven.enterprise.database.TableOptions.newLiveBuilder().isBlink(true).build())

Sequence numbers and exactly-once delivery

For guaranteed exactly-once delivery, your source table must include a sequence column with strictly ascending values:

// Table with sequence numbers
sequencedTable = timeTable("PT1s").update(
    "Sequence=ii",           // Ascending sequence
    "Data=random()",
    "Timestamp=now()"
)

// Configure DTW to use sequence column
options = new DerivedTableWriter.Options().sequenceColumn("Sequence")

Without sequence numbers, rows may be duplicated or lost during system restarts.

Namespaces and system tables

Derived Table Writer writes to system tables within specific namespaces. System tables provide:

  • Persistent storage across restarts.
  • Automatic partitioning by date.
  • Integration with Deephaven's query engine.
// Writing to namespace "Analytics", table "ProcessedData"
DerivedTableWriter.ingestTable("Analytics", "ProcessedData", today(), "partition1", dis, sourceTable, options)

End-to-end example: Real-time market data processing

This section demonstrates a complete DTW implementation using market data processing. We'll build a system that ingests real-time market data, applies transformations, and persists the results with exactly-once delivery guarantees.

Create a sample data source

First, let's create a simulated market data feed that mimics real trading data:

// Create simulated market data
marketData = timeTable("PT0.5s").update(
    "Symbol=ii % 4 == 0 ? `AAPL` : ii % 4 == 1 ? `MSFT` : ii % 4 == 2 ? `GOOG` : `AMZN`",
    "Price=100.0 + (random() * 10)",
    "Volume=(int)(100 + random() * 900)",
    "Timestamp=now()",
    "Sequence=ii"  // Sequence column for exactly-once delivery
)

This creates a table with:

  • Symbol: Rotating between four major stocks.
  • Price: Random prices around $100-110.
  • Volume: Random trade volumes between 100-1000 shares.
  • Timestamp: Current time for each row.
  • Sequence: Ascending sequence numbers for exactly-once delivery.

Design your data transformation

Next, we'll add calculated fields that provide additional insights:

// Add calculated metrics
enrichedMarketData = marketData.update(
    "DollarVolume=Price * Volume",
    "PriceChange=Price - Price_[i-1]"
)

Our transformations add:

  • DollarVolume: Total dollar value of each trade.
  • PriceChange: Price movement from the previous trade.

Set up Data Import Server and schema

Please refer to the Setup section for detailed instructions on setting up your Data Import Server and schema.

Write the ingestion query

Create your ingestion query that connects the source data to the Derived Table Writer:

import io.deephaven.enterprise.dataimportserver.DataImportServerTools
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter

// Initialize the Data Import Server
dis = DataImportServerTools.getDisByNameWithStorage("trades_processor", "/db/dataImportServers/trades_processor")

// Start ingestion
dtw = DerivedTableWriter.ingestTable(
    "Trading",              // namespace
    "EnrichedTrades",       // table name
    today(),                // column partition
    "live",                 // internal partition
    dis,                    // Data Import Server
    enrichedMarketData,     // source table
    new DerivedTableWriter.Options().sequenceColumn("Sequence")
)

// Access the persisted data
persistedTrades = db.liveTable("Trading", "EnrichedTrades").where("Date=today()")

This example demonstrates the core DTW pattern: transform data, configure persistence, and access results.

Setup

Schema preparation

Create a schema that defines your table structure. The simplest approach is to derive it from an existing table:

import java.util.ArrayList
import java.nio.file.Files
import java.nio.file.StandardOpenOption

import io.deephaven.engine.table.TableDefinition
import io.deephaven.engine.table.ColumnDefinition

import io.deephaven.enterprise.compatibility.TableDefinitionCompatibility
import static io.deephaven.shadow.enterprise.com.illumon.iris.db.tables.TableDefinition.STORAGETYPE_NESTEDPARTITIONEDONDISK

// Create a sample table to define schema structure
sampleTable = emptyTable(0).update(
    "Symbol=`AAPL`",
    "Price=150.0",
    "Volume=100",
    "DollarVolume=15000.0",
    "Timestamp=now()",
    "Sequence=0L"
)

// Generate schema from table definition
try {
    coreDefinition = sampleTable.getDefinition()
    columnDefinitions = new ArrayList<>(coreDefinition.getColumns())
    columnDefinitions.add(0, ColumnDefinition.ofString("Date").withPartitioning())
    coreDefinition = TableDefinition.of(columnDefinitions)

    edef = TableDefinitionCompatibility.convertToEnterprise(coreDefinition)
    edef.setNamespace("Trading")
    edef.setName("EnrichedTrades")
    edef.setStorageType(STORAGETYPE_NESTEDPARTITIONEDONDISK)

    schema = io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.xml.SchemaXmlFactory.getXmlSchema(edef, io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.NamespaceSet.SYSTEM)

    // Write schema to disk
    schemaFile = new File("/tmp/Trading.EnrichedTrades.schema")
    if (schemaFile.exists()) {
        println("Schema file already exists. Remove or rename existing file.")
        return
    }

    Files.write(schemaFile.toPath(),
        [io.deephaven.shadow.enterprise.io.deephaven.dhservice.SchemaFormatUtil.asString(schema)],
        StandardOpenOption.CREATE_NEW)

    println("Schema written to: ${schemaFile.absolutePath}")
} catch (Exception e) {
    println("Error generating schema: ${e.message}")
    throw e
}

Deploy the schema using dhconfig:

sudo /usr/illumon/latest/bin/dhconfig schema import /tmp/Trading.EnrichedTrades.schema

Alternative: Add schema programmatically:

import io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.SchemaServiceFactory
import io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.NamespaceSet

try {
    // Create the namespace if it doesn't exist (required before adding schema)
    SchemaServiceFactory.getDefault().createNamespace(NamespaceSet.SYSTEM, "Trading")

    SchemaServiceFactory.getDefault().addSchema(schema)
    println("Schema added successfully")
} catch (Exception e) {
    println("Error adding schema: ${e.message}")
    // Schema may already exist - check if that's acceptable
    throw e
}

Storage directory setup

Create a dedicated storage directory for your Data Import Server:

# Create storage directory
sudo mkdir -p /db/dataImportServers/trades_processor

# Set appropriate ownership and permissions
sudo chown dbmerge:dbmerge /db/dataImportServers/trades_processor
sudo chmod 755 /db/dataImportServers/trades_processor

Important considerations:

  • Directory must be exclusively owned by the DIS.
  • The dbmerge account needs read/write access for Persistent Queries.
  • Use the pattern /db/dataImportServers/[DIS_Name]/ for consistency.

Data Import Server configuration

Register your DIS with the routing system:

# Add DIS with table claim
sudo -u irisadmin /usr/illumon/latest/bin/dhconfig dis add \
    --name trades_processor \
    --claim Trading.EnrichedTrades

Configuration options:

  • --name: Unique identifier for your DIS.
  • --claim: Namespace.TableName that this DIS will handle.
  • Dynamic ports are used by default (recommended).
  • Private storage requires specifying the directory path in your code.

For detailed configuration options, see Add a Data Import Server.

Writing the ingestion query

Create your ingestion query that connects the source data to the Derived Table Writer:

import io.deephaven.enterprise.dataimportserver.DataImportServerTools
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter

// Initialize the Data Import Server
dis = DataImportServerTools.getDisByNameWithStorage("trades_processor", "/db/dataImportServers/trades_processor")

// Create your source data transformation
sourceTable = timeTable("PT1s").update(
    "Symbol=`AAPL`",
    "Price=150.0 + random() * 10",
    "Volume=(int)(100 + random() * 900)",
    "Timestamp=now()",
    "Sequence=ii"
)

enrichedTable = sourceTable.update("DollarVolume=Price * Volume")

// Start ingestion
dtw = DerivedTableWriter.ingestTable(
    "Trading",              // namespace
    "EnrichedTrades",       // table name
    today(),                // column partition
    "live",                 // internal partition
    dis,                    // Data Import Server
    enrichedTable,          // source table
    new DerivedTableWriter.Options().sequenceColumn("Sequence")
)

// Verify ingestion is working
resultTable = db.liveTable("Trading", "EnrichedTrades").where("Date=today()")

Examples

JSON message processing

This example shows how to parse JSON messages and persist structured data:

import io.deephaven.engine.context.ExecutionContext
import io.deephaven.enterprise.dataimportserver.DataImportServerTools
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule

// Configure JSON parsing
objectMapper = new ObjectMapper()
objectMapper.registerModule(new JavaTimeModule())

// Initialize DIS for JSON processing
dis = DataImportServerTools.getDisByNameWithStorage("json_processor", "/db/dataImportServers/json_processor")

// Source table with raw JSON messages (blink table for memory efficiency)
jsonTable = db.liveTable("Source", "RawMessages",
    io.deephaven.enterprise.database.TableOptions.newLiveBuilder().isBlink(true).build())

// Parse JSON and extract fields
parsedTable = jsonTable.update(
    "ParsedData=objectMapper.readTree(JsonMessage)",
    "Sequence=ParsedData.get('sequence').asLong()",
    "Timestamp=java.time.Instant.ofEpochMilli(ParsedData.get('timestamp').asLong())",
    "Symbol=ParsedData.get('symbol').asText()",
    "Price=ParsedData.get('price').asDouble()",
    "Volume=ParsedData.get('volume').asInt()"
).view("Sequence", "Timestamp", "Symbol", "Price", "Volume")

// Ingest parsed data
dtw = DerivedTableWriter.ingestTable(
    "Market", "ParsedMessages", today(), "live", dis, parsedTable,
    new DerivedTableWriter.Options().sequenceColumn("Sequence")
)

// Access results
parsedMessages = db.liveTable("Market", "ParsedMessages").where("Date=today()")

Example JSON message format:

{
  "sequence": 12345,
  "timestamp": 1640995200000,
  "symbol": "AAPL",
  "price": 182.5,
  "volume": 1000
}

This approach uses Jackson's tree parsing for flexibility without requiring custom POJOs.

Trade data downsampling

This example demonstrates downsampling high-frequency trade data into minute-level aggregations:

import io.deephaven.engine.util.WindowCheck
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter
import io.deephaven.enterprise.dataimportserver.DataImportServerTools

// Initialize DIS for downsampled data
dis = DataImportServerTools.getDisByNameWithStorage("trade_downsampler", "/db/dataImportServers/trade_downsampler")

// Source: high-frequency trade data
rawTrades = db.liveTable("Market", "Trades").where("Date=today()")

// Downsample to 1-minute intervals
tradesByMinute = rawTrades
    .updateView("Bin=upperBin(Timestamp, 'PT1m')", "DollarVolume=Price * Size")
    .aggBy([
        AggCount("TradeCount"),
        AggSum("TotalVolume=Size", "TotalDollarVolume=DollarVolume"),
        AggFirst("OpenPrice=Price"),
        AggLast("ClosePrice=Price"),
        AggMin("LowPrice=Price"),
        AggMax("HighPrice=Price")
    ], "Symbol", "Bin")

// Only log complete intervals (exclude the current minute)
lastBins = tradesByMinute.aggBy([AggSortedLast("Bin", "Bin")], "Symbol")
activeBins = WindowCheck.addTimeWindow(lastBins, "Bin", 90_000_000_000L, "InWindow").where("InWindow")
completeBars = tradesByMinute.whereNotIn(activeBins, "Bin", "Symbol")

// Add sequence for exactly-once delivery
sequencedBars = completeBars.update("Sequence=ii")

// Ingest downsampled data
dtw = DerivedTableWriter.ingestTable(
    "Analytics", "MinuteBars", today(), "downsample", dis, sequencedBars,
    new DerivedTableWriter.Options().sequenceColumn("Sequence")
)

// Access downsampled results
minuteBars = db.liveTable("Analytics", "MinuteBars").where("Date=today()")

Key features:

  • Time bucketing: Groups trades into 1-minute intervals.
  • OHLC calculation: Computes open, high, low, close prices.
  • Volume aggregation: Sums share and dollar volumes.
  • Complete intervals only: Excludes the current (incomplete) minute.
  • Restart safety: Uses sequence numbers for exactly-once delivery.

Handling restarts and resumption

For production systems, implement restart logic to resume processing from the last checkpoint:

import io.deephaven.enterprise.dataimportserver.DataImportServerTools
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter

// Initialize DIS
dis = DataImportServerTools.getDisByNameWithStorage("analytics_processor", "/db/dataImportServers/analytics_processor")

// Determine last processed sequence number
lastProcessed = db.liveTable("Analytics", "ProcessedData")
    .where("Date=today()")
    .tail(1)
    .longColumnIterator("Sequence")

startSequence = lastProcessed.hasNext() ? (lastProcessed.nextLong() + 1) : 0
lastProcessed.close()

println("Resuming from sequence: " + startSequence)

// Create source data starting from checkpoint
sourceData = timeTable("PT0.1s").update(
    "Sequence=ii + startSequence",
    "Value=random() * 100",
    "Category=(ii % 3 == 0) ? `A` : (ii % 3 == 1) ? `B` : `C`"
)

// Apply transformations
processedData = sourceData.update(
    "ProcessedValue=Value * 1.1",
    "Timestamp=now()"
)

// Ingest with sequence tracking
dtw = DerivedTableWriter.ingestTable(
    "Analytics", "ProcessedData", today(), "main", dis, processedData,
    new DerivedTableWriter.Options().sequenceColumn("Sequence")
)

// Monitor progress
resultTable = db.liveTable("Analytics", "ProcessedData").where("Date=today()")

Restart benefits:

  • No data loss: Resumes from last processed sequence.
  • No duplicates: Sequence tracking prevents reprocessing.
  • Automatic checkpointing: DTW manages sequence state.
  • Feed integration: Can pass sequence to external data feeds.

Accessing persisted data

Retrieve ingested data like any other Deephaven table:

// Access today's data
today_data = db.liveTable("Analytics", "ProcessedData").where("Date=today()")

// Historical data access
historical_data = db.historicalTable("Analytics", "ProcessedData").where("Date=`2024-01-15`")

// Live streaming view
live_stream = db.liveTable("Analytics", "ProcessedData").tail(1000)
# Python equivalent
today_data = db.live_table("Analytics", "ProcessedData").where(["Date=today()"])
historical_data = db.historical_table("Analytics", "ProcessedData").where(
    ["Date=`2024-01-15`"]
)
live_stream = db.live_table("Analytics", "ProcessedData").tail(1000)

Production considerations

Partitioned table handling

When processing data with multiple independent streams, use partitioned tables for better performance and organization:

import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter

// Initialize DIS
dis = DataImportServerTools.getDisByNameWithStorage("multi_stream_processor", "/db/dataImportServers/multi_stream_processor")

// Load partitioned source data (one partition per symbol)
partitionedSource = db.livePartitionedTable("Market", "SymbolData",
    io.deephaven.enterprise.database.TableOptions.newLiveBuilder().isBlink(true).build())

// Apply transformations across all partitions
processedPartitions = partitionedSource.proxy()
    .update("EnrichedValue=Price * Volume * 1.1", "Sequence=ii")
    .view("Symbol", "Timestamp", "Price", "Volume", "EnrichedValue", "Sequence")
    .target()

// Ingest all partitions
pdtw = DerivedTableWriter.ingestPartitionedTable(
    "Analytics", "ProcessedSymbols", dis, processedPartitions,
    new DerivedTableWriter.PartitionedOptions(),
    new DerivedTableWriter.Options().sequenceColumn("Sequence")
)

// Access results
processedData = db.liveTable("Analytics", "ProcessedSymbols").where("Date=today()")

Benefits of partitioned ingestion:

  • Independent processing: Each partition maintains its own sequence numbers.
  • Parallel execution: Partitions process simultaneously.
  • Scalability: Handles high-volume multi-stream data efficiently.

Monitoring and troubleshooting

Common error patterns

1. Sequence number violations:

// Problem: Non-ascending sequences
badTable = timeTable("PT1s").update("Sequence=random() * 1000")  // Random sequences

// Solution: Ensure ascending sequences
goodTable = timeTable("PT1s").update("Sequence=ii")  // Ascending sequences

2. Schema mismatches:

// Problem: Table structure doesn't match schema
// Solution: Verify column names and types match exactly
sourceTable.getDefinition().getColumns().forEach { col ->
    println("${col.getName()}: ${col.getDataType()}")
}

3. Storage directory permissions:

# Check directory ownership and permissions
ls -la /db/dataImportServers/your_dis_name/

# Fix permissions if needed
sudo chown -R dbmerge:dbmerge /db/dataImportServers/your_dis_name/
sudo chmod -R 755 /db/dataImportServers/your_dis_name/

Performance monitoring

Monitor your DTW performance with these queries:

// Check ingestion rate
ingestedData = db.liveTable("YourNamespace", "YourTable").where("Date=today()")
recentData = ingestedData.tail(1000)

// Monitor sequence gaps
sequenceGaps = ingestedData
    .update("PrevSequence=Sequence_[i-1]", "Gap=Sequence - PrevSequence - 1")
    .where("Gap > 0")

// Check partition sizes
partitionSizes = ingestedData.countBy("InternalPartition")

Debugging ingestion issues

Enable detailed logging:

// Add debug information to your transformations
debugTable = sourceTable.update(
    "ProcessingTime=now()",
    "RowCount=ii",
    "YourTransformation=YourLogic"
)

// Log sample data for verification
debugTable.head(10).show()

Verify DIS status:

// Check if DIS is running
dis.isRunning()

// Get DIS statistics
dis.getStatistics()

Production deployment considerations

High availability setup

For production environments, consider these deployment patterns:

1. Multiple DIS instances:

# Configure multiple DIS instances for redundancy
sudo -u irisadmin /usr/illumon/latest/bin/dhconfig dis add \
    --name processor_primary \
    --claim Analytics.Data
sudo -u irisadmin /usr/illumon/latest/bin/dhconfig dis add \
    --name processor_backup \
    --claim Analytics.Data

2. Load balancing:

// Distribute load across multiple internal partitions
partitionKey = "partition_" + (ii % 4)  // 4 partitions

dtw = DerivedTableWriter.ingestTable(
    "Analytics", "DistributedData", today(), partitionKey, dis, sourceTable, options
)

Backup and recovery

Schema backup:

# Export current schemas
sudo /usr/illumon/latest/bin/dhconfig schema export --namespace Analytics --output /backup/schemas/

# Import during recovery
sudo /usr/illumon/latest/bin/dhconfig schema import /backup/schemas/Analytics.*.schema

Data verification:

// Verify data integrity after recovery
expectedCount = sourceDataCount  // From your source system
actualCount = db.liveTable("Analytics", "YourTable").where("Date=today()").size()

if (expectedCount != actualCount) {
    println("Data integrity issue: expected ${expectedCount}, got ${actualCount}")
}

Operational procedures

Daily maintenance:

// Check for processing delays
currentTime = now()
lastUpdate = db.liveTable("Analytics", "YourTable").lastBy("Timestamp").get("Timestamp", 0)
delay = Duration.between(lastUpdate, currentTime)

if (delay.toMinutes() > 5) {
    println("Processing delay detected: ${delay.toMinutes()} minutes")
}

Capacity planning:

// Monitor storage growth
dailyGrowth = db.historicalTable("Analytics", "YourTable")
    .where("Date >= pastBusinessDate(7)")
    .countBy("Date")
    .update("GrowthMB=Count * EstimatedRowSizeBytes / 1024 / 1024")

Reference

API methods

DerivedTableWriter.ingestTable()

static LivenessArtifact ingestTable(
    String namespace,           // Target namespace (e.g., "Trading")
    String tableName,          // Target table name (e.g., "Trades")
    String columnPartition,    // Date partition (typically today())
    String internalPartition,  // Internal partition identifier (e.g., "live")
    DataImportServer dis,      // Data Import Server instance
    Table sourceTable,         // Source table (must be add-only or blink)
    DerivedTableWriter.Options options  // Configuration options
)
// Returns: LivenessArtifact for managing the ingestion lifecycle

DerivedTableWriter.ingestPartitionedTable()

static LivenessArtifact ingestPartitionedTable(
    String namespace,                              // Target namespace
    String tableName,                             // Target table name
    DataImportServer dis,                         // Data Import Server instance
    PartitionedTable sourcePartitionedTable,     // Source partitioned table
    DerivedTableWriter.PartitionedOptions partitionedOptions,  // Partition configuration
    DerivedTableWriter.Options options           // Configuration options
)
// Returns: LivenessArtifact for managing the partitioned ingestion lifecycle

Configuration options

DerivedTableWriter.Options:

  • sequenceColumn(String): Specify sequence column for exactly-once delivery.
  • transactionsEnabled(boolean): Enable atomic transactions for multi-row operations.
  • logInitial(boolean): Whether to log initial table state on startup.
  • chunkSize(int): Set processing chunk size for performance tuning.
  • lastBy(String, Collection<String>): Configure last-by state tracking.
  • Additional options available in the Javadoc.

DerivedTableWriter.PartitionedOptions:

  • columnPartitionKey(String): Column containing partition names for dynamic partitioning.
  • columnPartitionValue(String): Fixed partition value for all partitions.
  • internalPartitionKey(String): Column containing internal partition names for dynamic partitioning.
  • internalPartitionValue(String): Fixed internal partition value for all partitions.
  • Additional options available in the Javadoc.

Transaction support

For data that requires atomic operations across multiple rows (e.g., position snapshots), enable transactions:

// Enable transactions for atomic multi-row operations
options = new DerivedTableWriter.Options()
    .sequenceColumn("Sequence")
    .transactionsEnabled(true)

// Source data with transaction boundaries
transactionalData = sourceTable.update(
    "TransactionId=ii / 100",  // Group rows into transactions
    "Sequence=ii"
)

dtw = DerivedTableWriter.ingestTable(
    "Portfolio", "Positions", today(), "live", dis, transactionalData, options
)

When to use transactions:

  • Position snapshots that must be consistent across symbols.
  • Order book updates that span multiple price levels.
  • Any multi-row data where partial updates would be incorrect.