How to Write Continuously Running Queries That Allow Data Deletion

This page provides an overview of how to delete data without causing query failures in Core+. For this example, our query only uses the current day's data, so, to save disk space, we can delete data that is more than two days old. This page demonstrates how to write a query so that it can still run when old data is deleted.

Note

A simple db.liveTable call that filters to all days of the week would crash if we deleted data out from under it. This is because liveTables are "add-only", which allows the engine to optimize operations. When you remove data, the add-only behavior is broken and causes a failure. If you try to read data that was deleted (for recomputing operations using previous values), then those operations would fail.

Querying the data

The first step is to retrieve the data, using a SourcePartitionedTable. The SourcePartitionedTable creates one constituent table per intraday partition. When a partition is deleted, the constituent is removed from the SourcePartitionedTable. Each constituent table is still add-only, and rows are never removed from the constituent tables.

import io.deephaven.enterprise.database.TableOptions

namespace = "DbInternal"
tableName = "ProcessEventLog"
MySourcePartitionedTable = db.livePartitionedTable(namespace, tableName, TableOptions.newLiveBuilder().internalPartitionColumn("__INTERNAL_PARTITION__").build())

A SourcePartitionedTable lets us filter out the column partitions (in this case, the date) that will eventually be deleted. We want our query to look only at today's data, with a rollover at midnight. We set up a WindowCheck to filter out data that is more than 24 hours old:

import io.deephaven.engine.util.WindowCheck
import io.deephaven.engine.table.PartitionedTableFactory

import java.time.ZoneId
import java.time.Duration
import java.time.format.DateTimeFormatter

//the column partition is a date in yyyy-MM-dd format
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("America/New_York"))
currentHour = Instant.now().atZone(ZoneId.of("America/New_York")).getHour()
//creates a time table that rolls over to a new partition at midnight
tt = timeTable(HOUR)
        .update("Timestamp = Timestamp.minus(Duration.ofHours(currentHour))",
                "Partition = formatter.format(Timestamp)")
        .firstBy("Partition")

PartitionWindowCheck = WindowCheck.addTimeWindow(
        tt,
        "Timestamp",
        DAY, //filters out data that is more than 24 hours old
        "InWindow")
        .where("InWindow")

//SourcePartitionedTable#table: returns a table consisting of TableLocationKey and a LocationTable columns
//We can treat this as a regular query table and filter out unwanted TableLocationKeys
FilteredLocationTable = MySourcePartitionedTable.table()
        .updateView("Partition = TableLocationKey.getPartitionValue(`Date`)")
        .whereIn(PartitionWindowCheck, "Partition")

//Reconstructs a PartitionedTable from the FilteredLocationTable and merges
//The result is the PEL for today's data
ConstituentDefinition = db.getTableDefinition(namespace, tableName)
FilteredSourcePartitionedTable = PartitionedTableFactory.of(FilteredLocationTable,
        MySourcePartitionedTable.keyColumnNames(),
        MySourcePartitionedTable.uniqueKeys(),
        MySourcePartitionedTable.constituentColumnName(),
        ConstituentDefinition,
        true)

//example aggregation on the constituents of the SourcePartitionedTable
ErrorMonitor = FilteredSourcePartitionedTable
        .transform({t -> t.countBy("Rows", "Level", "__INTERNAL_PARTITION__")})
        .merge()
ProcessEventLog = FilteredSourcePartitionedTable.merge()
PEL_SERVER = ProcessEventLog.where("Host = `myhost`")

You can query the result and the constituent tables of the SourcePartitionedTable as normal as long as it is done after filtering out the old partitions. For example, the ErrorMonitor query in the example continues to run after old dates are deleted:

ErrorMonitor = FilteredSourcePartitionedTable
        .transform({t -> t.countBy("Rows", "Level", "__INTERNAL_PARTITION__")})
        .merge()

This query, on the other hand, fails once old dates are deleted:

RowsByPartition = MySourcePartitionedTable
        .transform({t -> t.countBy("Rows", "Level", "__INTERNAL_PARTITION__")})
        .merge()

Delete old data

Once data has been filtered from the query, it is safe to delete. This example uses the data control tool:

sudo -u irisadmin /usr/illumon/latest/bin/dhctl intraday truncate --partitions DbInternal.ProcessEventLog.2024-01-18
sudo -u irisadmin /usr/illumon/latest/bin/dhctl intraday delete --partitions DbInternal.ProcessEventLog.2024-01-18

Warning

Ensure that the query has already stopped using the table data before you delete it. Attempting to delete data while it is simultaneously being filtered may cause the query to crash.