Derived Table Writer
A Derived Table Writer (DTW) enables real-time data transformation and persistence in Deephaven Enterprise. It listens to live tables, applies transformations, and writes the results to persistent storage with exactly-once delivery guarantees.
When to use a Derived Table Writer
Use a Derived Table Writer when you need to:
- Transform and persist streaming data in real-time with exactly-once guarantees.
- Resume processing after system restarts without data loss or duplication.
- Preserve computed results from complex analytics or custom logic.
- Downsample high-frequency data into time-based aggregations.
- Ensure exactly-once delivery of processed data to downstream systems.
This guide demonstrates a complete end-to-end example using market data processing to show you how to:
- Understand core concepts and partitioning
- Set up the necessary infrastructure components
- Transform and persist real-time data
- Access and query the results
Understanding partitioning in Deephaven
Before working with DTW, it's essential to understand how Deephaven organizes data using a multi-level partitioning system. This organization improves query performance, enables efficient data storage, and facilitates data lifecycle management.
Date partitioning
The most important partition in Deephaven is the Date partition. Every table written with DTW requires a special Date
column which serves as the primary partitioning key.
- The Date column physically separates data into different directories on disk.
- Queries that filter on the Date column are much faster as they only scan relevant partitions.
- Data lifecycle management uses Date partitioning for retention policies.
When using DTW, you specify the Date partition value with the columnPartition
parameter, which is typically set to today()
for current data:
DerivedTableWriter.ingestTable(
"Market", // namespace
"MarketData", // table name
today(), // columnPartition - specifies the Date partition value
"live", // internalPartition - explained below
dis, // Data Import Server
sourceTable, // table to ingest
options // configuration options
)
Internal partitioning
Within each Date partition, Deephaven further organizes data using internal partitions, specified by the internalPartition
parameter.
- Internal partitions group related data together for better organization.
- They facilitate parallel processing across different data streams.
- They enable separate handling of different data categories or sources.
For example, you might use different internal partitions for:
- Different data feeds ("nyse", "nasdaq")
- Different processing stages ("raw", "filtered", "aggregated")
- Different use cases ("live", "backtest", "research")
Internal partitions are specified as strings:
DTW.ingestTable("Market", "Trades", today(), "nyse", dis, nyseSourceTable, options)
DTW.ingestTable("Market", "Trades", today(), "nasdaq", dis, nasdaqSourceTable, options)
Partitioned tables
For high-volume data processing across multiple streams (e.g., processing many stock symbols in parallel), Deephaven offers partitioned tables that leverage both date and key-based partitioning:
// Process data partitioned by Symbol
DerivedTableWriter.ingestPartitionedTable(
"Market", // namespace
"SymbolData", // table name
dis, // Data Import Server
partitionedSourceTable, // source partitioned table
new DerivedTableWriter.PartitionedOptions(),
new DerivedTableWriter.Options().sequenceColumn("Sequence")
)
Partitioned tables maintain separate sequence tracking for each partition, allowing for independent, parallel processing.
Core concepts
Input table requirements
Derived Table Writer accepts only specific table types:
- Add-only tables: Tables where rows are only added, never modified or removed.
- Blink tables: Tables that show only the most recent changes, with automatic cleanup.
// Add-only table example
addOnlyTable = timeTable("PT1s").update("Value=ii")
// Blink table example
blinkTable = db.liveTable("Source", "Data",
io.deephaven.enterprise.database.TableOptions.newLiveBuilder().isBlink(true).build())
Sequence numbers and exactly-once delivery
For guaranteed exactly-once delivery, your source table must include a sequence column with strictly ascending values:
// Table with sequence numbers
sequencedTable = timeTable("PT1s").update(
"Sequence=ii", // Ascending sequence
"Data=random()",
"Timestamp=now()"
)
// Configure DTW to use sequence column
options = new DerivedTableWriter.Options().sequenceColumn("Sequence")
Without sequence numbers, rows may be duplicated or lost during system restarts.
Namespaces and system tables
Derived Table Writer writes to system tables within specific namespaces. System tables provide:
- Persistent storage across restarts.
- Automatic partitioning by date.
- Integration with Deephaven's query engine.
// Writing to namespace "Analytics", table "ProcessedData"
DerivedTableWriter.ingestTable("Analytics", "ProcessedData", today(), "partition1", dis, sourceTable, options)
End-to-end example: Real-time market data processing
This section demonstrates a complete DTW implementation using market data processing. We'll build a system that ingests real-time market data, applies transformations, and persists the results with exactly-once delivery guarantees.
Create a sample data source
First, let's create a simulated market data feed that mimics real trading data:
// Create simulated market data
marketData = timeTable("PT0.5s").update(
"Symbol=ii % 4 == 0 ? `AAPL` : ii % 4 == 1 ? `MSFT` : ii % 4 == 2 ? `GOOG` : `AMZN`",
"Price=100.0 + (random() * 10)",
"Volume=(int)(100 + random() * 900)",
"Timestamp=now()",
"Sequence=ii" // Sequence column for exactly-once delivery
)
This creates a table with:
- Symbol: Rotating between four major stocks.
- Price: Random prices around $100-110.
- Volume: Random trade volumes between 100-1000 shares.
- Timestamp: Current time for each row.
- Sequence: Ascending sequence numbers for exactly-once delivery.
Design your data transformation
Next, we'll add calculated fields that provide additional insights:
// Add calculated metrics
enrichedMarketData = marketData.update(
"DollarVolume=Price * Volume",
"PriceChange=Price - Price_[i-1]"
)
Our transformations add:
- DollarVolume: Total dollar value of each trade.
- PriceChange: Price movement from the previous trade.
Set up Data Import Server and schema
Please refer to the Setup section for detailed instructions on setting up your Data Import Server and schema.
Write the ingestion query
Create your ingestion query that connects the source data to the Derived Table Writer:
import io.deephaven.enterprise.dataimportserver.DataImportServerTools
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter
// Initialize the Data Import Server
dis = DataImportServerTools.getDisByNameWithStorage("trades_processor", "/db/dataImportServers/trades_processor")
// Start ingestion
dtw = DerivedTableWriter.ingestTable(
"Trading", // namespace
"EnrichedTrades", // table name
today(), // column partition
"live", // internal partition
dis, // Data Import Server
enrichedMarketData, // source table
new DerivedTableWriter.Options().sequenceColumn("Sequence")
)
// Access the persisted data
persistedTrades = db.liveTable("Trading", "EnrichedTrades").where("Date=today()")
This example demonstrates the core DTW pattern: transform data, configure persistence, and access results.
Setup
Schema preparation
Create a schema that defines your table structure. The simplest approach is to derive it from an existing table:
import java.util.ArrayList
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import io.deephaven.engine.table.TableDefinition
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.enterprise.compatibility.TableDefinitionCompatibility
import static io.deephaven.shadow.enterprise.com.illumon.iris.db.tables.TableDefinition.STORAGETYPE_NESTEDPARTITIONEDONDISK
// Create a sample table to define schema structure
sampleTable = emptyTable(0).update(
"Symbol=`AAPL`",
"Price=150.0",
"Volume=100",
"DollarVolume=15000.0",
"Timestamp=now()",
"Sequence=0L"
)
// Generate schema from table definition
try {
coreDefinition = sampleTable.getDefinition()
columnDefinitions = new ArrayList<>(coreDefinition.getColumns())
columnDefinitions.add(0, ColumnDefinition.ofString("Date").withPartitioning())
coreDefinition = TableDefinition.of(columnDefinitions)
edef = TableDefinitionCompatibility.convertToEnterprise(coreDefinition)
edef.setNamespace("Trading")
edef.setName("EnrichedTrades")
edef.setStorageType(STORAGETYPE_NESTEDPARTITIONEDONDISK)
schema = io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.xml.SchemaXmlFactory.getXmlSchema(edef, io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.NamespaceSet.SYSTEM)
// Write schema to disk
schemaFile = new File("/tmp/Trading.EnrichedTrades.schema")
if (schemaFile.exists()) {
println("Schema file already exists. Remove or rename existing file.")
return
}
Files.write(schemaFile.toPath(),
[io.deephaven.shadow.enterprise.io.deephaven.dhservice.SchemaFormatUtil.asString(schema)],
StandardOpenOption.CREATE_NEW)
println("Schema written to: ${schemaFile.absolutePath}")
} catch (Exception e) {
println("Error generating schema: ${e.message}")
throw e
}
Deploy the schema using dhconfig:
sudo /usr/illumon/latest/bin/dhconfig schema import /tmp/Trading.EnrichedTrades.schema
Alternative: Add schema programmatically:
import io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.SchemaServiceFactory
import io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.NamespaceSet
try {
// Create the namespace if it doesn't exist (required before adding schema)
SchemaServiceFactory.getDefault().createNamespace(NamespaceSet.SYSTEM, "Trading")
SchemaServiceFactory.getDefault().addSchema(schema)
println("Schema added successfully")
} catch (Exception e) {
println("Error adding schema: ${e.message}")
// Schema may already exist - check if that's acceptable
throw e
}
Storage directory setup
Create a dedicated storage directory for your Data Import Server:
# Create storage directory
sudo mkdir -p /db/dataImportServers/trades_processor
# Set appropriate ownership and permissions
sudo chown dbmerge:dbmerge /db/dataImportServers/trades_processor
sudo chmod 755 /db/dataImportServers/trades_processor
Important considerations:
- Directory must be exclusively owned by the DIS.
- The
dbmerge
account needs read/write access for Persistent Queries. - Use the pattern
/db/dataImportServers/[DIS_Name]/
for consistency.
Data Import Server configuration
Register your DIS with the routing system:
# Add DIS with table claim
sudo -u irisadmin /usr/illumon/latest/bin/dhconfig dis add \
--name trades_processor \
--claim Trading.EnrichedTrades
Configuration options:
--name
: Unique identifier for your DIS.--claim
: Namespace.TableName that this DIS will handle.- Dynamic ports are used by default (recommended).
- Private storage requires specifying the directory path in your code.
For detailed configuration options, see Add a Data Import Server.
Writing the ingestion query
Create your ingestion query that connects the source data to the Derived Table Writer:
import io.deephaven.enterprise.dataimportserver.DataImportServerTools
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter
// Initialize the Data Import Server
dis = DataImportServerTools.getDisByNameWithStorage("trades_processor", "/db/dataImportServers/trades_processor")
// Create your source data transformation
sourceTable = timeTable("PT1s").update(
"Symbol=`AAPL`",
"Price=150.0 + random() * 10",
"Volume=(int)(100 + random() * 900)",
"Timestamp=now()",
"Sequence=ii"
)
enrichedTable = sourceTable.update("DollarVolume=Price * Volume")
// Start ingestion
dtw = DerivedTableWriter.ingestTable(
"Trading", // namespace
"EnrichedTrades", // table name
today(), // column partition
"live", // internal partition
dis, // Data Import Server
enrichedTable, // source table
new DerivedTableWriter.Options().sequenceColumn("Sequence")
)
// Verify ingestion is working
resultTable = db.liveTable("Trading", "EnrichedTrades").where("Date=today()")
Examples
JSON message processing
This example shows how to parse JSON messages and persist structured data:
import io.deephaven.engine.context.ExecutionContext
import io.deephaven.enterprise.dataimportserver.DataImportServerTools
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
// Configure JSON parsing
objectMapper = new ObjectMapper()
objectMapper.registerModule(new JavaTimeModule())
// Initialize DIS for JSON processing
dis = DataImportServerTools.getDisByNameWithStorage("json_processor", "/db/dataImportServers/json_processor")
// Source table with raw JSON messages (blink table for memory efficiency)
jsonTable = db.liveTable("Source", "RawMessages",
io.deephaven.enterprise.database.TableOptions.newLiveBuilder().isBlink(true).build())
// Parse JSON and extract fields
parsedTable = jsonTable.update(
"ParsedData=objectMapper.readTree(JsonMessage)",
"Sequence=ParsedData.get('sequence').asLong()",
"Timestamp=java.time.Instant.ofEpochMilli(ParsedData.get('timestamp').asLong())",
"Symbol=ParsedData.get('symbol').asText()",
"Price=ParsedData.get('price').asDouble()",
"Volume=ParsedData.get('volume').asInt()"
).view("Sequence", "Timestamp", "Symbol", "Price", "Volume")
// Ingest parsed data
dtw = DerivedTableWriter.ingestTable(
"Market", "ParsedMessages", today(), "live", dis, parsedTable,
new DerivedTableWriter.Options().sequenceColumn("Sequence")
)
// Access results
parsedMessages = db.liveTable("Market", "ParsedMessages").where("Date=today()")
Example JSON message format:
{
"sequence": 12345,
"timestamp": 1640995200000,
"symbol": "AAPL",
"price": 182.5,
"volume": 1000
}
This approach uses Jackson's tree parsing for flexibility without requiring custom POJOs.
Trade data downsampling
This example demonstrates downsampling high-frequency trade data into minute-level aggregations:
import io.deephaven.engine.util.WindowCheck
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter
import io.deephaven.enterprise.dataimportserver.DataImportServerTools
// Initialize DIS for downsampled data
dis = DataImportServerTools.getDisByNameWithStorage("trade_downsampler", "/db/dataImportServers/trade_downsampler")
// Source: high-frequency trade data
rawTrades = db.liveTable("Market", "Trades").where("Date=today()")
// Downsample to 1-minute intervals
tradesByMinute = rawTrades
.updateView("Bin=upperBin(Timestamp, 'PT1m')", "DollarVolume=Price * Size")
.aggBy([
AggCount("TradeCount"),
AggSum("TotalVolume=Size", "TotalDollarVolume=DollarVolume"),
AggFirst("OpenPrice=Price"),
AggLast("ClosePrice=Price"),
AggMin("LowPrice=Price"),
AggMax("HighPrice=Price")
], "Symbol", "Bin")
// Only log complete intervals (exclude the current minute)
lastBins = tradesByMinute.aggBy([AggSortedLast("Bin", "Bin")], "Symbol")
activeBins = WindowCheck.addTimeWindow(lastBins, "Bin", 90_000_000_000L, "InWindow").where("InWindow")
completeBars = tradesByMinute.whereNotIn(activeBins, "Bin", "Symbol")
// Add sequence for exactly-once delivery
sequencedBars = completeBars.update("Sequence=ii")
// Ingest downsampled data
dtw = DerivedTableWriter.ingestTable(
"Analytics", "MinuteBars", today(), "downsample", dis, sequencedBars,
new DerivedTableWriter.Options().sequenceColumn("Sequence")
)
// Access downsampled results
minuteBars = db.liveTable("Analytics", "MinuteBars").where("Date=today()")
Key features:
- Time bucketing: Groups trades into 1-minute intervals.
- OHLC calculation: Computes open, high, low, close prices.
- Volume aggregation: Sums share and dollar volumes.
- Complete intervals only: Excludes the current (incomplete) minute.
- Restart safety: Uses sequence numbers for exactly-once delivery.
Handling restarts and resumption
For production systems, implement restart logic to resume processing from the last checkpoint:
import io.deephaven.enterprise.dataimportserver.DataImportServerTools
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter
// Initialize DIS
dis = DataImportServerTools.getDisByNameWithStorage("analytics_processor", "/db/dataImportServers/analytics_processor")
// Determine last processed sequence number
lastProcessed = db.liveTable("Analytics", "ProcessedData")
.where("Date=today()")
.tail(1)
.longColumnIterator("Sequence")
startSequence = lastProcessed.hasNext() ? (lastProcessed.nextLong() + 1) : 0
lastProcessed.close()
println("Resuming from sequence: " + startSequence)
// Create source data starting from checkpoint
sourceData = timeTable("PT0.1s").update(
"Sequence=ii + startSequence",
"Value=random() * 100",
"Category=(ii % 3 == 0) ? `A` : (ii % 3 == 1) ? `B` : `C`"
)
// Apply transformations
processedData = sourceData.update(
"ProcessedValue=Value * 1.1",
"Timestamp=now()"
)
// Ingest with sequence tracking
dtw = DerivedTableWriter.ingestTable(
"Analytics", "ProcessedData", today(), "main", dis, processedData,
new DerivedTableWriter.Options().sequenceColumn("Sequence")
)
// Monitor progress
resultTable = db.liveTable("Analytics", "ProcessedData").where("Date=today()")
Restart benefits:
- No data loss: Resumes from last processed sequence.
- No duplicates: Sequence tracking prevents reprocessing.
- Automatic checkpointing: DTW manages sequence state.
- Feed integration: Can pass sequence to external data feeds.
Accessing persisted data
Retrieve ingested data like any other Deephaven table:
// Access today's data
today_data = db.liveTable("Analytics", "ProcessedData").where("Date=today()")
// Historical data access
historical_data = db.historicalTable("Analytics", "ProcessedData").where("Date=`2024-01-15`")
// Live streaming view
live_stream = db.liveTable("Analytics", "ProcessedData").tail(1000)
# Python equivalent
today_data = db.live_table("Analytics", "ProcessedData").where(["Date=today()"])
historical_data = db.historical_table("Analytics", "ProcessedData").where(
["Date=`2024-01-15`"]
)
live_stream = db.live_table("Analytics", "ProcessedData").tail(1000)
Production considerations
Partitioned table handling
When processing data with multiple independent streams, use partitioned tables for better performance and organization:
import io.deephaven.enterprise.derivedtablewriter.DerivedTableWriter
// Initialize DIS
dis = DataImportServerTools.getDisByNameWithStorage("multi_stream_processor", "/db/dataImportServers/multi_stream_processor")
// Load partitioned source data (one partition per symbol)
partitionedSource = db.livePartitionedTable("Market", "SymbolData",
io.deephaven.enterprise.database.TableOptions.newLiveBuilder().isBlink(true).build())
// Apply transformations across all partitions
processedPartitions = partitionedSource.proxy()
.update("EnrichedValue=Price * Volume * 1.1", "Sequence=ii")
.view("Symbol", "Timestamp", "Price", "Volume", "EnrichedValue", "Sequence")
.target()
// Ingest all partitions
pdtw = DerivedTableWriter.ingestPartitionedTable(
"Analytics", "ProcessedSymbols", dis, processedPartitions,
new DerivedTableWriter.PartitionedOptions(),
new DerivedTableWriter.Options().sequenceColumn("Sequence")
)
// Access results
processedData = db.liveTable("Analytics", "ProcessedSymbols").where("Date=today()")
Benefits of partitioned ingestion:
- Independent processing: Each partition maintains its own sequence numbers.
- Parallel execution: Partitions process simultaneously.
- Scalability: Handles high-volume multi-stream data efficiently.
Monitoring and troubleshooting
Common error patterns
1. Sequence number violations:
// Problem: Non-ascending sequences
badTable = timeTable("PT1s").update("Sequence=random() * 1000") // Random sequences
// Solution: Ensure ascending sequences
goodTable = timeTable("PT1s").update("Sequence=ii") // Ascending sequences
2. Schema mismatches:
// Problem: Table structure doesn't match schema
// Solution: Verify column names and types match exactly
sourceTable.getDefinition().getColumns().forEach { col ->
println("${col.getName()}: ${col.getDataType()}")
}
3. Storage directory permissions:
# Check directory ownership and permissions
ls -la /db/dataImportServers/your_dis_name/
# Fix permissions if needed
sudo chown -R dbmerge:dbmerge /db/dataImportServers/your_dis_name/
sudo chmod -R 755 /db/dataImportServers/your_dis_name/
Performance monitoring
Monitor your DTW performance with these queries:
// Check ingestion rate
ingestedData = db.liveTable("YourNamespace", "YourTable").where("Date=today()")
recentData = ingestedData.tail(1000)
// Monitor sequence gaps
sequenceGaps = ingestedData
.update("PrevSequence=Sequence_[i-1]", "Gap=Sequence - PrevSequence - 1")
.where("Gap > 0")
// Check partition sizes
partitionSizes = ingestedData.countBy("InternalPartition")
Debugging ingestion issues
Enable detailed logging:
// Add debug information to your transformations
debugTable = sourceTable.update(
"ProcessingTime=now()",
"RowCount=ii",
"YourTransformation=YourLogic"
)
// Log sample data for verification
debugTable.head(10).show()
Verify DIS status:
// Check if DIS is running
dis.isRunning()
// Get DIS statistics
dis.getStatistics()
Production deployment considerations
High availability setup
For production environments, consider these deployment patterns:
1. Multiple DIS instances:
# Configure multiple DIS instances for redundancy
sudo -u irisadmin /usr/illumon/latest/bin/dhconfig dis add \
--name processor_primary \
--claim Analytics.Data
sudo -u irisadmin /usr/illumon/latest/bin/dhconfig dis add \
--name processor_backup \
--claim Analytics.Data
2. Load balancing:
// Distribute load across multiple internal partitions
partitionKey = "partition_" + (ii % 4) // 4 partitions
dtw = DerivedTableWriter.ingestTable(
"Analytics", "DistributedData", today(), partitionKey, dis, sourceTable, options
)
Backup and recovery
Schema backup:
# Export current schemas
sudo /usr/illumon/latest/bin/dhconfig schema export --namespace Analytics --output /backup/schemas/
# Import during recovery
sudo /usr/illumon/latest/bin/dhconfig schema import /backup/schemas/Analytics.*.schema
Data verification:
// Verify data integrity after recovery
expectedCount = sourceDataCount // From your source system
actualCount = db.liveTable("Analytics", "YourTable").where("Date=today()").size()
if (expectedCount != actualCount) {
println("Data integrity issue: expected ${expectedCount}, got ${actualCount}")
}
Operational procedures
Daily maintenance:
// Check for processing delays
currentTime = now()
lastUpdate = db.liveTable("Analytics", "YourTable").lastBy("Timestamp").get("Timestamp", 0)
delay = Duration.between(lastUpdate, currentTime)
if (delay.toMinutes() > 5) {
println("Processing delay detected: ${delay.toMinutes()} minutes")
}
Capacity planning:
// Monitor storage growth
dailyGrowth = db.historicalTable("Analytics", "YourTable")
.where("Date >= pastBusinessDate(7)")
.countBy("Date")
.update("GrowthMB=Count * EstimatedRowSizeBytes / 1024 / 1024")
Reference
API methods
DerivedTableWriter.ingestTable()
static LivenessArtifact ingestTable(
String namespace, // Target namespace (e.g., "Trading")
String tableName, // Target table name (e.g., "Trades")
String columnPartition, // Date partition (typically today())
String internalPartition, // Internal partition identifier (e.g., "live")
DataImportServer dis, // Data Import Server instance
Table sourceTable, // Source table (must be add-only or blink)
DerivedTableWriter.Options options // Configuration options
)
// Returns: LivenessArtifact for managing the ingestion lifecycle
DerivedTableWriter.ingestPartitionedTable()
static LivenessArtifact ingestPartitionedTable(
String namespace, // Target namespace
String tableName, // Target table name
DataImportServer dis, // Data Import Server instance
PartitionedTable sourcePartitionedTable, // Source partitioned table
DerivedTableWriter.PartitionedOptions partitionedOptions, // Partition configuration
DerivedTableWriter.Options options // Configuration options
)
// Returns: LivenessArtifact for managing the partitioned ingestion lifecycle
Configuration options
DerivedTableWriter.Options:
sequenceColumn(String)
: Specify sequence column for exactly-once delivery.transactionsEnabled(boolean)
: Enable atomic transactions for multi-row operations.logInitial(boolean)
: Whether to log initial table state on startup.chunkSize(int)
: Set processing chunk size for performance tuning.lastBy(String, Collection<String>)
: Configure last-by state tracking.- Additional options available in the Javadoc.
DerivedTableWriter.PartitionedOptions:
columnPartitionKey(String)
: Column containing partition names for dynamic partitioning.columnPartitionValue(String)
: Fixed partition value for all partitions.internalPartitionKey(String)
: Column containing internal partition names for dynamic partitioning.internalPartitionValue(String)
: Fixed internal partition value for all partitions.- Additional options available in the Javadoc.
Transaction support
For data that requires atomic operations across multiple rows (e.g., position snapshots), enable transactions:
// Enable transactions for atomic multi-row operations
options = new DerivedTableWriter.Options()
.sequenceColumn("Sequence")
.transactionsEnabled(true)
// Source data with transaction boundaries
transactionalData = sourceTable.update(
"TransactionId=ii / 100", // Group rows into transactions
"Sequence=ii"
)
dtw = DerivedTableWriter.ingestTable(
"Portfolio", "Positions", today(), "live", dis, transactionalData, options
)
When to use transactions:
- Position snapshots that must be consistent across symbols.
- Order book updates that span multiple price levels.
- Any multi-row data where partial updates would be incorrect.