Programmatic query management

This guide covers programmatic scheduling, management, and monitoring of Persistent Queries (PQs). PQs are queries that are scheduled to run regularly to perform data ingestion, analysis, and dashboards. Deephaven has both Python and Groovy APIs for managing PQs, both of which are discussed.

Programmatic management allows you to create more dynamic process workflows that would be difficult or impossible using only manual management in the UI. You can monitor, create, stop, restart, and delete PQs programmatically.

This guide does not cover the Query Monitor, which is a graphical interface for managing PQs. See Query management in the UI for more details.

Monitor

To monitor PQs programmatically, start with the SessionManager class in Python, and the ControllerClientFactory in Groovy.

Note

The code blocks below are run from Code Studios within a Deephaven installation. If connecting remotely, be sure to provide connection details and authentication. See Python client and/or Java client for details.

import io.deephaven.enterprise.dnd.ControllerClientFactory;

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();
from deephaven_enterprise.client.session_manager import SessionManager

sm = SessionManager()

With this, you can get information on each PQ in the system in a map:

pqs = client.getDataCopy()
pqs = sm.controller_client.map()

This map uses the serial number of each PQ as the key. The following code prints the name and serial number of each PQ:

for (pq in pqs.values()) {
    info = pq.getConfig()
    println (info.getName() + " - " + info.serial)
}
for serial, pq in pqs.items():
    print(f"{pq.config.name} - {serial}")

You can access individual PQs by their serial number:

println pqs[1699635416039000002]
print(pqs[1699635416039000002])

Create

This guide presents an example of creating a merge PQ. This process applies to most other PQ types, as the steps are similar.

First, each language requires import statements:

import io.deephaven.enterprise.dnd.ControllerClientFactory
import static io.deephaven.shadow.enterprise.com.illumon.util.IngesterPersistentQueryConstants.*
import io.deephaven.proto.controller.PersistentQueryConfigMessage
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.deephaven.proto.controller.RestartUsersEnum
from deephaven_enterprise.proto.persistent_query_pb2 import PersistentQueryConfigMessage
from deephaven_enterprise.client.session_manager import SessionManager
import json
from typing import Dict

Next, define a required method/closure for type encoding that will be used during creation:

// Encodes type-specific fields into the JSON object that the controller expects
encodeJsonStringNode = { ObjectNode root, String name, String value ->
        root.putObject(name).put("type","String").put("value",value)
}
def __encode_type_specific_fields(tsf: Dict) -> str:
    """
    Encodes type-specific fields from a Python dictionary into the JSON object that the controller expects.
    :param tsf: a Python dictionary with type-specific fields
    :return: a JSON encoded string suitable for the controller
    """
    encoded = {}
    for k, v in tsf.items():
        encoded[k] = {"type": "string", "value": v}

    return json.dumps(encoded)

With that done, you must first create a query configuration. This will use the previously defined method/closure to encode the type-specific fields:

configBuilder = PersistentQueryConfigMessage.newBuilder()
configBuilder.setHeapSizeGb(4.0)

ObjectMapper mapper = new ObjectMapper();
ObjectNode rootNode = mapper.createObjectNode();

// Set TypeSpecificField values
encodeJsonStringNode(rootNode, "LowHeapUsage", "false")
encodeJsonStringNode(rootNode, "Force", "false")
encodeJsonStringNode(rootNode, "AllowEmptyInput", "true")
encodeJsonStringNode(rootNode, "SortColumnFormula", "")
encodeJsonStringNode(rootNode, "ThreadPoolSize", "4")
encodeJsonStringNode(rootNode, "Namespace", "LearnDeephaven")
encodeJsonStringNode(rootNode, "Table", "StockQuotes")
encodeJsonStringNode(rootNode, "PartitionFormula", "\"2017-08-25\"")
encodeJsonStringNode(rootNode, "TableDataServiceConfig", "local")
encodeJsonStringNode(rootNode, "Format","Default")

configBuilder.setTypeSpecificFieldsJson(new ObjectMapper().writeValueAsString(rootNode))

// set Long.MIN_VALUE for serial to create a new serial for this PQ
configBuilder.setSerial(Long.MIN_VALUE)
configBuilder.setConfigurationType(CONFIGURATION_TYPE_MERGE)
configBuilder.setName("Test Programmatic Merge Creation")
configBuilder.setOwner("iris")
configBuilder.setEnabled(false)
configBuilder.setServerName("Merge_1")
configBuilder.setDetailedGCLoggingEnabled(false)
configBuilder.setBufferPoolToHeapRatio(.3)
configBuilder.setJvmProfile("Default")
configBuilder.setRestartUsers(RestartUsersEnum.RU_ADMIN)
config = PersistentQueryConfigMessage()

type_specific_fields = {
    "LowHeapUsage": "false",
    "Force": "false",
    "AllowEmptyInput": "true",
    "SortColumnFormula": "",
    "ThreadPoolSize": "4",
    "Namespace": "LearnDeephaven",
    "Table": "StockQuotes",
    "PartitionFormula": '"2017-08-25"',
    "TableDataServiceConfig": "local",
    "Format": "Default",
}

config.typeSpecificFieldsJson = __encode_type_specific_fields(type_specific_fields)

# -(2**63) is Java Long.MIN_VALUE - it allows the controller to assign a serial number
config.serial = -(2**63)
config.configurationType = "Merge"
config.name = "Test Programmatic Merge Creation"
config.version = 1
config.owner = "iris"
config.enabled = False
config.heapSizeGb = 4.0
config.serverName = "Merge_1"
config.detailedGCLoggingEnabled = False
config.jvmProfile = "Default"
config.bufferPoolToHeapRatio = 0.3
config.restartUsers = deephaven_enterprise.proto.persistent_query_pb2.RU_ADMIN

Next, define scheduling, which determines when the PQ runs. The following code block adds a list of scheduling parameters to the PQ configuration. Python offers an alternative to the GenerateScheduling class. See Programmatic scheduling in Python for details. See Available schedulers for details on the available types of scheduling.

Note

The Python code below uses many of the default values in generate_daily_scheduler. See the link for details on the defaults.

String[] scheduling = [
                "SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily",
                "Calendar=USNYSE",
                "BusinessDays=false",
                "StartTime=07:55:00",
                "StopTime=23:55:00",
                "TimeZone=America/New_York",
                "SchedulingDisabled=true",
                "Overnight=false",
                "RepeatEnabled=false",
                "SkipIfUnsuccessful=false",
                "RestartErrorCount=0",
                "RestartErrorDelay=0",
                "RestartWhenRunning=Yes"
        ];

for (int i=0; i<scheduling.size(); i++) {
        configBuilder.addScheduling(scheduling[i])
}
from deephaven_enterprise.client.generate_scheduling import GenerateScheduling

schedulingArray = GenerateScheduling.generate_daily_scheduler(
    start_time="07:55:55",
    stop_time="23:55:00",
    repeat_interval=3,
)
config.ClearField("scheduling")
config.scheduling.extend(schedulingArray)

Lastly, specify timeout values:

// timeout in nanoseconds
configBuilder.setTimeoutNanos(600000000000)
initialize_timeout_seconds: float = 60.0
config.timeoutNanos = int(initialize_timeout_seconds * 1_000_000_000)

With all of the setup done, create the PQ:

configMessage=configBuilder.build()
client.addQuery(configMessage)
session_mgr.controller_client.add_query(config)

Programmatic scheduling in Python

In Python, you may also create a list of scheduling parameters and pass it to the configuration as opposed to using GenerateScheduling to create scheduling details for a PQ.

The following example creates a daily scheduler that runs from all seven days of the week from 07:55:00 to 23:55 in the US/East timezone, along with some additional scheduling parameters:

scheduling = []
scheduling.append("SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily")
scheduling.append("Calendar=USNYSE")
scheduling.append("BusinessDays=false")
scheduling.append("StartTime=07:55:00")
scheduling.append("StopTime=23:55:00")
scheduling.append("TimeZone=America/New_York")
scheduling.append("SchedulingDisabled=true")
scheduling.append("Overnight=false")
scheduling.append("RepeatEnabled=false")
scheduling.append("SkipIfUnsuccessful=false")
scheduling.append("RestartErrorCount=0")
scheduling.append("RestartErrorDelay=0")
scheduling.append("RestartWhenRunning=Yes")

for param in scheduling:
    config.scheduling.append(param)

Available schedulers

Deephaven offers the following types of scheduling for PQs:

In Python, use GenerateScheduler.generate_continuous_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerContinuous in the properties list to create a continuous scheduler.

In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerContinuous to create a continuous scheduler.

In Python, use GenerateScheduling.generate_daily_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQueryScheduler in the properties list to create a daily scheduler.

In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily to create a daily scheduler.

In Python, use GenerateScheduling.generate_dependent_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDependent in the properties list to create a dependent scheduler.

In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDependent to create a dependent scheduler.

In Python, use GenerateScheduling.generate_disabled_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDisabled in the properties list to create a disabled scheduler.

In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDisabled to create a disabled scheduler.

In Python, use GenerateScheduling.generate_monthly_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerMonthly in the properties list to create a monthly scheduler.

In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerMonthly to create a monthly scheduler.

In Python, use GenerateScheduling.generate_range_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerRange in the properties list to create a range scheduler.

In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerRange to create a range scheduler.

In Python, use GenerateScheduling.generate_temporary_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerTemporary in the properties list to create a temporary scheduler.

In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerTemporary to create a temporary scheduler.

Stop

Stopping a PQ programmatically is simple - all you need is its serial number.

import io.deephaven.enterprise.dnd.ControllerClientFactory;

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();

serials = [1699635416039000002]

client.stopQueriesBySerial(serials)
from deephaven_enterprise.client.session_manager import SessionManager

sm = SessionManager()
sm.controller_client.stop_query(1699635416039000002)

Restart

Restarting a PQ programmatically is simple - all you need is its serial number.

import io.deephaven.enterprise.dnd.ControllerClientFactory;

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();

client.restartQuery(1699635416039000002)
from deephaven_enterprise.client.session_manager import SessionManager

sm = SessionManager()
sm.controller_client.restart_query(1699635416039000002)

Delete

Deleting a PQ programmatically is simple - all you need is its serial number.

Caution

Deleting a PQ is permanent and cannot be undone.

import io.deephaven.enterprise.dnd.ControllerClientFactory;

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();

client.removeQuery(1699635416039000002)
from deephaven_enterprise.client.session_manager import SessionManager

sm = SessionManager()
sm.controller_client.delete_query(1699635416039000002)