Programmatic query management
This guide covers programmatic scheduling, management, and monitoring of Persistent Queries (PQs). PQs are queries that are scheduled to run regularly to perform data ingestion, analysis, and dashboards. Deephaven has both Python and Groovy APIs for managing PQs, both of which are discussed.
Programmatic management allows you to create more dynamic process workflows that would be difficult or impossible using only manual management in the UI. You can monitor, create, stop, restart, and delete PQs programmatically.
This guide does not cover the Query Monitor, which is a graphical interface for managing PQs. See Query management in the UI for more details.
Monitor
To monitor PQs programmatically, start with the SessionManager
class in Python, and the ControllerClientFactory
in Groovy.
Note
The code blocks below are run from Code Studios within a Deephaven installation. If connecting remotely, be sure to provide connection details and authentication. See Python client and/or Java client for details.
import io.deephaven.enterprise.dnd.ControllerClientFactory;
client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();
from deephaven_enterprise.client.session_manager import SessionManager
sm = SessionManager()
With this, you can get information on each PQ in the system in a map:
pqs = client.getDataCopy()
pqs = sm.controller_client.map()
This map uses the serial number of each PQ as the key. The following code prints the name and serial number of each PQ:
for (pq in pqs.values()) {
info = pq.getConfig()
println (info.getName() + " - " + info.serial)
}
for serial, pq in pqs.items():
print(f"{pq.config.name} - {serial}")
You can access individual PQs by their serial number:
println pqs[1699635416039000002]
print(pqs[1699635416039000002])
Create
This guide presents an example of creating a merge PQ. This process applies to most other PQ types, as the steps are similar.
First, each language requires import statements:
import io.deephaven.enterprise.dnd.ControllerClientFactory
import static io.deephaven.shadow.enterprise.com.illumon.util.IngesterPersistentQueryConstants.*
import io.deephaven.proto.controller.PersistentQueryConfigMessage
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.deephaven.proto.controller.RestartUsersEnum
from deephaven_enterprise.proto.persistent_query_pb2 import PersistentQueryConfigMessage
from deephaven_enterprise.client.session_manager import SessionManager
import json
from typing import Dict
Next, define a required method/closure for type encoding that will be used during creation:
// Encodes type-specific fields into the JSON object that the controller expects
encodeJsonStringNode = { ObjectNode root, String name, String value ->
root.putObject(name).put("type","String").put("value",value)
}
def __encode_type_specific_fields(tsf: Dict) -> str:
"""
Encodes type-specific fields from a Python dictionary into the JSON object that the controller expects.
:param tsf: a Python dictionary with type-specific fields
:return: a JSON encoded string suitable for the controller
"""
encoded = {}
for k, v in tsf.items():
encoded[k] = {"type": "string", "value": v}
return json.dumps(encoded)
With that done, you must first create a query configuration. This will use the previously defined method/closure to encode the type-specific fields:
configBuilder = PersistentQueryConfigMessage.newBuilder()
configBuilder.setHeapSizeGb(4.0)
ObjectMapper mapper = new ObjectMapper();
ObjectNode rootNode = mapper.createObjectNode();
// Set TypeSpecificField values
encodeJsonStringNode(rootNode, "LowHeapUsage", "false")
encodeJsonStringNode(rootNode, "Force", "false")
encodeJsonStringNode(rootNode, "AllowEmptyInput", "true")
encodeJsonStringNode(rootNode, "SortColumnFormula", "")
encodeJsonStringNode(rootNode, "ThreadPoolSize", "4")
encodeJsonStringNode(rootNode, "Namespace", "LearnDeephaven")
encodeJsonStringNode(rootNode, "Table", "StockQuotes")
encodeJsonStringNode(rootNode, "PartitionFormula", "\"2017-08-25\"")
encodeJsonStringNode(rootNode, "TableDataServiceConfig", "local")
encodeJsonStringNode(rootNode, "Format","Default")
configBuilder.setTypeSpecificFieldsJson(new ObjectMapper().writeValueAsString(rootNode))
// set Long.MIN_VALUE for serial to create a new serial for this PQ
configBuilder.setSerial(Long.MIN_VALUE)
configBuilder.setConfigurationType(CONFIGURATION_TYPE_MERGE)
configBuilder.setName("Test Programmatic Merge Creation")
configBuilder.setOwner("iris")
configBuilder.setEnabled(false)
configBuilder.setServerName("Merge_1")
configBuilder.setDetailedGCLoggingEnabled(false)
configBuilder.setBufferPoolToHeapRatio(.3)
configBuilder.setJvmProfile("Default")
configBuilder.setRestartUsers(RestartUsersEnum.RU_ADMIN)
config = PersistentQueryConfigMessage()
type_specific_fields = {
"LowHeapUsage": "false",
"Force": "false",
"AllowEmptyInput": "true",
"SortColumnFormula": "",
"ThreadPoolSize": "4",
"Namespace": "LearnDeephaven",
"Table": "StockQuotes",
"PartitionFormula": '"2017-08-25"',
"TableDataServiceConfig": "local",
"Format": "Default",
}
config.typeSpecificFieldsJson = __encode_type_specific_fields(type_specific_fields)
# -(2**63) is Java Long.MIN_VALUE - it allows the controller to assign a serial number
config.serial = -(2**63)
config.configurationType = "Merge"
config.name = "Test Programmatic Merge Creation"
config.version = 1
config.owner = "iris"
config.enabled = False
config.heapSizeGb = 4.0
config.serverName = "Merge_1"
config.detailedGCLoggingEnabled = False
config.jvmProfile = "Default"
config.bufferPoolToHeapRatio = 0.3
config.restartUsers = deephaven_enterprise.proto.persistent_query_pb2.RU_ADMIN
Next, define scheduling, which determines when the PQ runs. The following code block adds a list of scheduling parameters to the PQ configuration. Python offers an alternative to the GenerateScheduling
class. See Programmatic scheduling in Python for details. See Available schedulers for details on the available types of scheduling.
Note
The Python code below uses many of the default values in generate_daily_scheduler
. See the link for details on the defaults.
String[] scheduling = [
"SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily",
"Calendar=USNYSE",
"BusinessDays=false",
"StartTime=07:55:00",
"StopTime=23:55:00",
"TimeZone=America/New_York",
"SchedulingDisabled=true",
"Overnight=false",
"RepeatEnabled=false",
"SkipIfUnsuccessful=false",
"RestartErrorCount=0",
"RestartErrorDelay=0",
"RestartWhenRunning=Yes"
];
for (int i=0; i<scheduling.size(); i++) {
configBuilder.addScheduling(scheduling[i])
}
from deephaven_enterprise.client.generate_scheduling import GenerateScheduling
schedulingArray = GenerateScheduling.generate_daily_scheduler(
start_time="07:55:55",
stop_time="23:55:00",
repeat_interval=3,
)
config.ClearField("scheduling")
config.scheduling.extend(schedulingArray)
Lastly, specify timeout values:
// timeout in nanoseconds
configBuilder.setTimeoutNanos(600000000000)
initialize_timeout_seconds: float = 60.0
config.timeoutNanos = int(initialize_timeout_seconds * 1_000_000_000)
With all of the setup done, create the PQ:
configMessage=configBuilder.build()
client.addQuery(configMessage)
session_mgr.controller_client.add_query(config)
Programmatic scheduling in Python
In Python, you may also create a list of scheduling parameters and pass it to the configuration as opposed to using GenerateScheduling
to create scheduling details for a PQ.
The following example creates a daily scheduler that runs from all seven days of the week from 07:55:00 to 23:55 in the US/East timezone, along with some additional scheduling parameters:
scheduling = []
scheduling.append("SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily")
scheduling.append("Calendar=USNYSE")
scheduling.append("BusinessDays=false")
scheduling.append("StartTime=07:55:00")
scheduling.append("StopTime=23:55:00")
scheduling.append("TimeZone=America/New_York")
scheduling.append("SchedulingDisabled=true")
scheduling.append("Overnight=false")
scheduling.append("RepeatEnabled=false")
scheduling.append("SkipIfUnsuccessful=false")
scheduling.append("RestartErrorCount=0")
scheduling.append("RestartErrorDelay=0")
scheduling.append("RestartWhenRunning=Yes")
for param in scheduling:
config.scheduling.append(param)
Available schedulers
Deephaven offers the following types of scheduling for PQs:
In Python, use GenerateScheduler.generate_continuous_scheduler
or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerContinuous
in the properties list to create a continuous scheduler.
In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerContinuous
to create a continuous scheduler.
In Python, use GenerateScheduling.generate_daily_scheduler
or set SchedulerType=com.illumon.iris.controller.IrisQueryScheduler
in the properties list to create a daily scheduler.
In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily
to create a daily scheduler.
In Python, use GenerateScheduling.generate_dependent_scheduler
or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDependent
in the properties list to create a dependent scheduler.
In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDependent
to create a dependent scheduler.
In Python, use GenerateScheduling.generate_disabled_scheduler
or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDisabled
in the properties list to create a disabled scheduler.
In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDisabled
to create a disabled scheduler.
In Python, use GenerateScheduling.generate_monthly_scheduler
or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerMonthly
in the properties list to create a monthly scheduler.
In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerMonthly
to create a monthly scheduler.
In Python, use GenerateScheduling.generate_range_scheduler
or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerRange
in the properties list to create a range scheduler.
In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerRange
to create a range scheduler.
In Python, use GenerateScheduling.generate_temporary_scheduler
or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerTemporary
in the properties list to create a temporary scheduler.
In Groovy, set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerTemporary
to create a temporary scheduler.
Stop
Stopping a PQ programmatically is simple - all you need is its serial number.
import io.deephaven.enterprise.dnd.ControllerClientFactory;
client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();
serials = [1699635416039000002]
client.stopQueriesBySerial(serials)
from deephaven_enterprise.client.session_manager import SessionManager
sm = SessionManager()
sm.controller_client.stop_query(1699635416039000002)
Restart
Restarting a PQ programmatically is simple - all you need is its serial number.
import io.deephaven.enterprise.dnd.ControllerClientFactory;
client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();
client.restartQuery(1699635416039000002)
from deephaven_enterprise.client.session_manager import SessionManager
sm = SessionManager()
sm.controller_client.restart_query(1699635416039000002)
Delete
Deleting a PQ programmatically is simple - all you need is its serial number.
Caution
Deleting a PQ is permanent and cannot be undone.
import io.deephaven.enterprise.dnd.ControllerClientFactory;
client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();
client.removeQuery(1699635416039000002)
from deephaven_enterprise.client.session_manager import SessionManager
sm = SessionManager()
sm.controller_client.delete_query(1699635416039000002)