How to write continuously running queries that allow data deletion

This guide demonstrates how to delete data without causing query failures in Core+.

If you have a large, long-running table but only need the current day's data, you can save disk space by deleting data older than two days. However, deleting table data while a query is running may cause the query to fail. In the following examples, a SourcePartitionedTable is used to create a partitioned table, and a WindowCheck is used to filter out old data to be deleted.

Caution

A simple db.liveTable call that filters to all days of the week would crash if you deleted data out from under it. This is because liveTables are "add-only", which allows the engine to optimize operations. When you remove data, the add-only behavior is broken and causes a failure. If you try to read data that was deleted (for recomputing operations using previous values), then those operations would fail.

Querying the data

First, retrieve the data using a SourcePartitionedTable. The SourcePartitionedTable creates one constituent table per intraday partition. When a partition is deleted, the constituent is removed from the SourcePartitionedTable. Each constituent table is still add-only, and rows are never removed from the constituent tables.

namespace = "DbInternal"
table_name = "ProcessEventLog"
MySourcePartitionedTable = db.live_partitioned_table(namespace, table_name)

A SourcePartitionedTable lets you filter out the column partitions (in this case, the date) that will eventually be deleted. In this example, the query should look only at today's data, with a rollover at midnight. Use a time_window to filter out data that is more than 24 hours old:

namespace = "DbInternal"
tableName = "ProcessEventLog"
MySourcePartitionedTable = db.live_partitioned_table("DbInternal", "ProcessEventLog")

# Import required Java classes
import jpy

ZoneId = jpy.get_type("java.time.ZoneId")
Duration = jpy.get_type("java.time.Duration")
DateTimeFormatter = jpy.get_type("java.time.format.DateTimeFormatter")
Instant = jpy.get_type("java.time.Instant")

from datetime import datetime, timedelta
from deephaven import *
from deephaven.experimental import time_window
from deephaven.execution_context import get_exec_ctx
from deephaven.table import PartitionedTable

# the column partition is a date in yyyy-MM-dd format
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(
    ZoneId.of("America/New_York")
)
currentHour = Instant.now().atZone(ZoneId.of("America/New_York")).getHour()
# creates a time table that rolls over to a new partition at midnight
tt = (
    time_table("PT1H")
    .update(
        [
            "Timestamp = Timestamp.minusSeconds(currentHour * 60 * 60)",
            "Partition = formatter.format(Timestamp)",
        ]
    )
    .first_by("Partition")
)

PartitionWindowCheck = time_window(
    tt, "Timestamp", 1 * 1000000000 * 86400, "InWindow = true"
)  # filters out data that is more than 24 hours old

# SourcePartitionedTable#table: returns a table consisting of TableLocationKey and a LocationTable columns
# We can treat this as a regular query table and filter out unwanted TableLocationKeys
FilteredLocationTable = MySourcePartitionedTable.table.update_view(
    "Partition = (String) TableLocationKey.getPartitionValue(`Date`)"
).where_in(PartitionWindowCheck, "Partition")

# Reconstructs a PartitionedTable from the FilteredLocationTable
FilteredSourcePartitionedTable = PartitionedTable.from_partitioned_table(
    FilteredLocationTable,
    MySourcePartitionedTable.key_columns,
    MySourcePartitionedTable.unique_keys,
    MySourcePartitionedTable.constituent_column,
    MySourcePartitionedTable.constituent_table_definition,
    True,
)

ctx = get_exec_ctx()


def f(t):
    with ctx:
        return t.count_by("Rows", ["Level", "Host"])


# example aggregation on the constituents of the SourcePartitionedTable
ErrorMonitor = FilteredSourcePartitionedTable.transform(f).merge()

# after merging the constituents, you can query the result as you normally would
ProcessEventLog = FilteredSourcePartitionedTable.merge()
ProcessEventLogMyHost = ProcessEventLog.where("Host = `myhost`")

You can query the result and the constituent tables of the SourcePartitionedTable like you normally would, as long as it is done after filtering out the old partitions. For example, the ErrorMonitor query in the example continues to run after old dates are deleted:

ErrorMonitor = FilteredSourcePartitionedTable.transform(f).merge()

This query, on the other hand, fails once old dates are deleted:

RowsByPartition = MySourcePartitionedTable.transform(f).merge()

First, retrieve the data using a SourcePartitionedTable. The SourcePartitionedTable creates one constituent table per intraday partition. When a partition is deleted, the constituent is removed from the SourcePartitionedTable. Each constituent table is still add-only, and rows are never removed from the constituent tables.

namespace = "DbInternal"
tableName = "ProcessEventLog"
MySourcePartitionedTable = db.livePartitionedTable(namespace, tableName)

A SourcePartitionedTable lets you filter out the column partitions (in this case, the date) that will eventually be deleted. In this example, the query should look only at today's data, with a rollover at midnight. Use a WindowCheck to filter out data that is more than 24 hours old:

import io.deephaven.engine.util.WindowCheck
import io.deephaven.engine.table.PartitionedTableFactory

import java.time.ZoneId
import java.time.Duration
import java.time.format.DateTimeFormatter

namespace = "DbInternal"
tableName = "ProcessEventLog"
MySourcePartitionedTable = db.livePartitionedTable(namespace, tableName)

//the column partition is a date in yyyy-MM-dd format
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("America/New_York"))
currentHour = Instant.now().atZone(ZoneId.of("America/New_York")).getHour()
//creates a time table that rolls over to a new partition at midnight
tt = timeTable(HOUR)
        .update("Timestamp = Timestamp.minus(Duration.ofHours(currentHour))",
                "Partition = formatter.format(Timestamp)")
        .firstBy("Partition")

PartitionWindowCheck = WindowCheck.addTimeWindow(
        tt,
        "Timestamp",
        DAY, //filters out data that is more than 24 hours old
        "InWindow")
        .where("InWindow")

//SourcePartitionedTable#table: returns a table consisting of TableLocationKey and a LocationTable columns
//We can treat this as a regular query table and filter out unwanted TableLocationKeys
FilteredLocationTable = MySourcePartitionedTable.table()
        .updateView("Partition = (String) TableLocationKey.getPartitionValue(`Date`)")
        .whereIn(PartitionWindowCheck, "Partition")

//Reconstructs a PartitionedTable from the FilteredLocationTable
ConstituentDefinition = db.getTableDefinition(namespace, tableName)
FilteredSourcePartitionedTable = PartitionedTableFactory.of(FilteredLocationTable,
        MySourcePartitionedTable.keyColumnNames(),
        MySourcePartitionedTable.uniqueKeys(),
        MySourcePartitionedTable.constituentColumnName(),
        ConstituentDefinition,
        true)

//example aggregation on the constituents of the SourcePartitionedTable
ErrorMonitor = FilteredSourcePartitionedTable
        .transform({t -> t.countBy("Rows", "Level", "Host")})
        .merge()

// after merging the constituents, you can query the result as you normally would
ProcessEventLog = FilteredSourcePartitionedTable.merge()
ProcessEventLogMyHost = ProcessEventLog.where("Host = `myhost`")

You can query the result and the constituent tables of the SourcePartitionedTable like you normally would, as long as it is done after filtering out the old partitions. For example, the ErrorMonitor query in the example continues to run after old dates are deleted:

ErrorMonitor = FilteredSourcePartitionedTable
        .transform({t -> t.countBy("Rows", "Level", "Host")})
        .merge()

This query, on the other hand, fails once old dates are deleted:

RowsByPartition = MySourcePartitionedTable
        .transform({t -> t.countBy("Rows", "Level", "Host")})
        .merge()

Delete old data

Once data has been filtered from the query, it is safe to delete. This example uses the data control tool:

sudo -u irisadmin /usr/illumon/latest/bin/dhctl intraday truncate --partitions DbInternal.ProcessEventLog.2024-01-18
sudo -u irisadmin /usr/illumon/latest/bin/dhctl intraday delete --partitions DbInternal.ProcessEventLog.2024-01-18

Warning

Ensure that the query has already stopped using the table data before you delete it. Attempting to delete data while it is simultaneously being filtered may cause the query to crash.