Persistent Query Infrastructure and API
The Persistent Query Controller
(the controller
, for short) is listed as iris_controller
in monit
, and runs on the controller
pod in Kubernetes. It is the service responsible for managing Persistent Queries. For instance, when it is time to start a scheduled Persistent Query, it is the controller that starts the PQ and monitors its state. The Query Monitor
(Web) and Query Config
(Java Swing) panels obtain information about Persistent Queries in the system from the controller using the PersistentQueryControllerClient
for Legacy or the PersistentQueryControllerClient
for Core+.
Persistent query services and objects
The PersistentQueryControllerClient
provides a method (getData()
in Legacy or getDataCopy()
in Core+) that returns a Map of persistent query serials and PersistentQueryInfo
objects.
Native Python classes for Core+ management of persistent queries are not yet available. Currently, it is recommended to use Groovy if management of persistent queries from Core+ workers is desired.
For Legacy workers:
import com.illumon.iris.controller.PersistentQueryControllerClient;
client = PersistentQueryControllerClient.getControllerClient("Console Script",log, true);
for (PQ in client.getData().values()) {
println (PQ.getConfig().getName());
Level = jpy.get_type('com.fishlib.io.log.LogLevel').INFO
Logger = jpy.get_type('com.fishlib.io.logger.NullLoggerImpl')
pqc = jpy.get_type('com.illumon.iris.controller.PersistentQueryControllerClient')
client = pqc.getControllerClient("Console Script",Logger(Level), True)
PQs = client.getData().values().toArray()
for PQ in PQs:
print (PQ.getConfig().getName())
For Core+ workers:
import io.deephaven.enterprise.dnd.ControllerClientFactory;
client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();
for (PQ in client.getDataCopy().values()) {
println (PQ.getConfig().getName());
}
Each PersistentQueryInfo
object represent a PQ's current state, and includes methods to get additional details about it:
getState()
retrieves aPersistentQueryState
, which provides the details about the running PQ, such asworkerHost
andworkerPort
.getConfig()
retrieves aPersistentQueryConfiguration
, which provides the details about the PQ, such as owner, worker settings, etc.
Managing Persistent Queries programmatically
The PersistentQueryControllerClient
provides methods to manage PQs on the system. See the JavaDoc for complete details on this class. There is also a native Python class in the Core+ Python client that can be used to interact with the controller.
When creating a new PQ, the serial of the PQ must be set to Java's Long.MIN_VALUE
; this is the default behavior for the Legacy PQ API, but, for the newer gRPC API, the serial property defaults to 0, and must be explicitly set to Long.MIN_VALUE
. It is possible to set a specific positive value for the serial of a new PQ so that, when importing a PQ from a backup or export, its serial can be maintained to match other configuration objects, such as dashboards
.
When configuring some types of Peristent Queries, there are type-specific fields that need to be set; for example, the table that an import, validate, or merge PQ will operate on.
- For Legacy workers, these properties are defined in various public constants in classes, such as
MergePersistentQuery
. The type-specific field properties are set through a Java Map. - For Core+ Workers, not all of these classes have been made available yet. The Core+ example scripts below include some definitions of these constants. A full list of constants and related classes is shown in Appendix A, below. The type-specific field properties are set or through a JSON String. For the JSON, each property name element must have a value and a data type specified.
String
as the data type works for all of these type-specific properties.
One other complex area of configuration is the scheduling settings.
- In Legacy, scheduling is passed as an array of Strings.
- In Core+, scheduling is passed as a sequence of
.addScheduling()
calls (one per line of scheduling information).
Here are examples of programmatically creating a Merge PQ.
For Legacy workers:
import com.illumon.iris.controller.PersistentQueryControllerClient
import java.util.Map
import java.util.HashMap
import static com.illumon.iris.pqimport.MergePersistentQuery.*
import com.illumon.iris.controller.PersistentQueryConfiguration
import static com.illumon.util.IngesterPersistentQueryConstants.*
client = PersistentQueryControllerClient.getControllerClient("Console Script",log, true)
// Create a new Merge PQ
String[] scheduling = [
"SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily",
"Calendar=USNYSE",
"BusinessDays=false",
"StartTime=07:55:00",
"StopTime=23:55:00",
"TimeZone=America/New_York",
"SchedulingDisabled=true",
"Overnight=false",
"RepeatEnabled=false",
"SkipIfUnsuccessful=false",
"RestartErrorCount=0",
"RestartErrorDelay=0",
"RestartWhenRunning=Yes"
];
PersistentQueryConfiguration config = new PersistentQueryConfiguration()
config.setHeapSizeInGB(4.0)
// Set TypeSpecificField values
Map map = new HashMap()
map.put(LOW_HEAP_USAGE,"false")
map.put(FORCE_OPTION,"false")
map.put(ALLOW_EMPTY_INPUT,"true")
map.put(SORT_COLUMN_FORMULA,"")
map.put(THREAD_POOL_SIZE,"4")
map.put(TYPE_SPECIFIC_FIELD_NAMESPACE, "LearnDeephaven")
map.put(TYPE_SPECIFIC_FIELD_TABLE, "StockQuotes")
map.put(TYPE_SPECIFIC_FIELD_PARTITION_FORMULA, "\"2017-08-25\"")
map.put(TYPE_SPECIFIC_FIELD_TABLE_DATA_SERVICE_CONFIG,"local")
map.put(TYPE_SPECIFIC_FIELD_FORMAT,"Default")
config.setTypeSpecificFields(map)
config.setConfigurationType(CONFIGURATION_TYPE_MERGE)
config.setName("Test Programmatic Merge Creation")
config.setOwner("iris")
config.setEnabled(false)
config.setDbServerName("Merge_1")
config.setDetailedGCLoggingEnabled(false)
config.setDataBufferPoolToHeapSizeRatio(.3)
config.setJVMProfile("Default")
// RESTARTUSERS_ADMIN=1
// RESTARTUSERS_ADMINANDVIEWERS=2
// RESTARTUSERS_VIEWERSWHENDOWN=4
config.setRestartUsers(1)
config.setScheduling(scheduling)
// timeout in milliseconds
config.setTimeout(600000)
client.addQueryConfiguration(config)
PQCC = jpy.get_type('com.illumon.iris.controller.PersistentQueryControllerClient')
HASHMAP = jpy.get_type('java.util.HashMap')
MPQ = jpy.get_type('com.illumon.iris.pqimport.MergePersistentQuery')
PQC = jpy.get_type('com.illumon.iris.controller.PersistentQueryConfiguration')
IPQ = jpy.get_type('com.illumon.util.IngesterPersistentQueryConstants')
Level = jpy.get_type('com.fishlib.io.log.LogLevel').INFO
Logger = jpy.get_type('com.fishlib.io.logger.NullLoggerImpl')
client = PQCC.getControllerClient('Console Script',Logger(Level),True)
# Create a new Merge PQ
scheduling = []
scheduling.append('SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily')
scheduling.append('Calendar=USNYSE')
scheduling.append('BusinessDays=false')
scheduling.append('StartTime=07:55:00')
scheduling.append('StopTime=23:55:00')
scheduling.append('TimeZone=America/New_York')
scheduling.append('SchedulingDisabled=true')
scheduling.append('Overnight=false')
scheduling.append('RepeatEnabled=false')
scheduling.append('SkipIfUnsuccessful=false')
scheduling.append('RestartErrorCount=0')
scheduling.append('RestartErrorDelay=0')
scheduling.append('RestartWhenRunning=Yes')
config = PQC()
config.setHeapSizeInGB(4.0)
# Set TypeSpecificField values
map = HASHMAP()
map.put(MPQ.LOW_HEAP_USAGE,'false')
map.put(MPQ.FORCE_OPTION,'false')
map.put(MPQ.ALLOW_EMPTY_INPUT,'true')
map.put(MPQ.SORT_COLUMN_FORMULA,'')
map.put(MPQ.THREAD_POOL_SIZE,'4')
map.put(IPQ.TYPE_SPECIFIC_FIELD_NAMESPACE, 'LearnDeephaven')
map.put(IPQ.TYPE_SPECIFIC_FIELD_TABLE, 'StockQuotes')
map.put(IPQ.TYPE_SPECIFIC_FIELD_PARTITION_FORMULA, '\"2017-08-25\"')
map.put(IPQ.TYPE_SPECIFIC_FIELD_TABLE_DATA_SERVICE_CONFIG,'local')
map.put(MPQ.TYPE_SPECIFIC_FIELD_FORMAT,'Default')
config.setTypeSpecificFields(map)
config.setConfigurationType(IPQ.CONFIGURATION_TYPE_MERGE)
config.setName('Test Programmatic Merge Creation')
config.setOwner('iris')
config.setEnabled(False)
config.setDbServerName('Merge_1')
config.setDetailedGCLoggingEnabled(False)
config.setDataBufferPoolToHeapSizeRatio(.3)
config.setJVMProfile('Default')
# RESTARTUSERS_ADMIN=1
# RESTARTUSERS_ADMINANDVIEWERS=2
# RESTARTUSERS_VIEWERSWHENDOWN=4
config.setRestartUsers(1)
config.setScheduling(scheduling)
# timeout in milliseconds
config.setTimeout(600000)
client.addQueryConfiguration(config)
For Core+ workers:
import io.deephaven.enterprise.dnd.ControllerClientFactory
import static io.deephaven.shadow.enterprise.com.illumon.util.IngesterPersistentQueryConstants.*
import io.deephaven.proto.controller.PersistentQueryConfigMessage
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.deephaven.proto.controller.RestartUsersEnum
// Some TypeSpecificField constants
final String LOW_HEAP_USAGE = "LowHeapUsage";
final String FORCE_OPTION = "Force";
final String ALLOW_EMPTY_INPUT = "AllowEmptyInput";
final String SORT_COLUMN_FORMULA = "SortColumnFormula";
final String THREAD_POOL_SIZE = "ThreadPoolSize";
final String TYPE_SPECIFIC_FIELD_FORMAT = "Format";
final String TYPE_SPECIFIC_FIELD_CODEC = "Codec";
encodeJsonStringNode = { ObjectNode root, String name, String value ->
root.putObject(name).put("type","String").put("value",value)
}
client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();
// Create a new Merge PQ
String[] scheduling = [
"SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily",
"Calendar=USNYSE",
"BusinessDays=false",
"StartTime=07:55:00",
"StopTime=23:55:00",
"TimeZone=America/New_York",
"SchedulingDisabled=true",
"Overnight=false",
"RepeatEnabled=false",
"SkipIfUnsuccessful=false",
"RestartErrorCount=0",
"RestartErrorDelay=0",
"RestartWhenRunning=Yes"
];
configBuilder = PersistentQueryConfigMessage.newBuilder()
configBuilder.setHeapSizeGb(4.0)
ObjectNode rootNode = mapper.createObjectNode();
// Set TypeSpecificField values
encodeJsonStringNode(rootNode,LOW_HEAP_USAGE,"false")
encodeJsonStringNode(rootNode,FORCE_OPTION,"false")
encodeJsonStringNode(rootNode,ALLOW_EMPTY_INPUT,"true")
encodeJsonStringNode(rootNode,SORT_COLUMN_FORMULA,"")
encodeJsonStringNode(rootNode,THREAD_POOL_SIZE,"4")
encodeJsonStringNode(rootNode,TYPE_SPECIFIC_FIELD_NAMESPACE, "LearnDeephaven")
encodeJsonStringNode(rootNode,TYPE_SPECIFIC_FIELD_TABLE, "StockQuotes")
encodeJsonStringNode(rootNode,TYPE_SPECIFIC_FIELD_PARTITION_FORMULA, "\"2017-08-25\"")
encodeJsonStringNode(rootNode,TYPE_SPECIFIC_FIELD_TABLE_DATA_SERVICE_CONFIG,"local")
encodeJsonStringNode(rootNode,TYPE_SPECIFIC_FIELD_FORMAT,"Default")
configBuilder.setTypeSpecificFieldsJson(new ObjectMapper().writeValueAsString(rootNode))
// set Long.MIN_VALUE for serial to create a new serial for this PQ
configBuilder.setSerial(Long.MIN_VALUE)
configBuilder.setConfigurationType(CONFIGURATION_TYPE_MERGE)
configBuilder.setName("Test Programmatic Merge Creation")
configBuilder.setOwner("iris")
configBuilder.setEnabled(false)
configBuilder.setServerName("Merge_1")
configBuilder.setDetailedGCLoggingEnabled(false)
configBuilder.setBufferPoolToHeapRatio(.3)
configBuilder.setJvmProfile("Default")
configBuilder.setRestartUsers(RestartUsersEnum.RU_ADMIN)
// add all scheduling property lines from scheduling String[]
for (int i=0; i<scheduling.size(); i++) {
configBuilder.addScheduling(scheduling[i])
}
// timeout in nanoseconds
configBuilder.setTimeoutNanos(600000000000)
configMessage=configBuilder.build()
client.addQuery(configMessage)
from deephaven_enterprise.client.session_manager import SessionManager
import json
from typing import Dict
def __encode_type_specific_fields(tsf: Dict) -> str:
"""
Encodes type specific fields from a Python dictionary into the JSON object that the controller expects.
:param tsf: a Python dictionary with type specific fields
:return: a JSON encoded string suitable for the controller
"""
encoded = {}
for (k, v) in tsf.items():
encoded[k] = {"type": "string", "value": v}
return json.dumps(encoded)
# authenticate to the Deephaven host
connection_info = "https://deephaven-host:8000/iris/connection.json"
session_mgr: SessionManager = SessionManager(connection_info)
session_mgr.private_key("/path-to-private-key/priv-username.base64.txt")
# Create the query configuration
config: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage =
deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage()
tsf = {
"LowHeapUsage": "false",
"Force": "false",
"AllowEmptyInput": "true",
"SortColumnFormula": "",
"ThreadPoolSize": "4",
"Namespace": "LearnDeephaven",
"Table": "StockQuotes",
"PartitionFormula": "\"2017-08-25\"",
"TableDataServiceConfig": "local",
"Format": "Default"
}
config.typeSpecificFieldsJson = __encode_type_specific_fields(tsf)
# Java Long.MIN_VALUE allows the controller to assign a serial number
config.serial = -2 ** 63
config.configurationType = "Merge"
config.name = "Test Programmatic Merge Creation"
config.version = 1
config.owner = "iris"
config.enabled = False
config.heapSizeGb = 1.0
config.serverName = "Merge_1"
config.detailedGCLoggingEnabled = False
config.jvmProfile = "Default"
config.bufferPoolToHeapRatio = 0.3
config.restartUsers = deephaven_enterprise.proto.persistent_query_pb2.RU_ADMIN
scheduling = []
scheduling.append("SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily")
scheduling.append("Calendar=USNYSE")
scheduling.append("BusinessDays=false")
scheduling.append("StartTime=07:55:00")
scheduling.append("StopTime=23:55:00")
scheduling.append("TimeZone=America/New_York")
scheduling.append("SchedulingDisabled=true")
scheduling.append("Overnight=false")
scheduling.append("RepeatEnabled=false")
scheduling.append("SkipIfUnsuccessful=false")
scheduling.append("RestartErrorCount=0")
scheduling.append("RestartErrorDelay=0")
scheduling.append("RestartWhenRunning=Yes")
for param in scheduling:
config.scheduling.append(param)
initialize_timeout_seconds: float = 60.0
config.timeoutNanos = int(initialize_timeout_seconds * 1_000_000_000)
# Add the query
session_mgr.controller_client.add_query(config)
Appendix A - Constant values for Persistent Query configuration types and type-specific fields
Note
Among the classes listed below, only io.deephaven.shadow.enterprise.com.illumon.util.IngesterPersistentQueryConstants
is available to Core+ workers. The other classes listed here are available to Legacy workers directly, or through jpy.get_type
. For Core+ workers, use the values documented here when setting configuration type or type-specific fields when creating or updating a Persistent Query.
com.illumon.iris.pqimport.BinaryImportPersistentQuery
Modifier and Type | Constant Field | Value |
---|---|---|
public static final String | CONFIGURATION_TYPE | "BinaryImport" |
public static final String | TYPE_SPECIFIC_FIELDS_SOURCE_DIRECTORY | "SourceDirectory" |
public static final String | TYPE_SPECIFIC_FIELDS_SOURCE_FILE | "SourceFile" |
public static final String | TYPE_SPECIFIC_FIELDS_SOURCE_GLOB | "SourceGlob" |
com.illumon.iris.pqimport.CsvImportPersistentQuery
Modifier and Type | Constant Field | Value |
---|---|---|
public static final String | CONFIGURATION_TYPE | "CsvImport" |
public static final String | CONSTANT | "Constant" |
public static final String | DELIMITER | "Delimiter" |
public static final String | FILEFORMAT | "Fileformat" |
public static final String | NO_HEADER | "NoHeader" |
public static final String | SKIP_FOOTER_LINES | "SkipFooterLines" |
public static final String | SKIPLINES | "Skiplines" |
public static final String | SOURCE_DIRECTORY | "SourceDirectory" |
public static final String | SOURCE_FILE | "SourceFile" |
public static final String | SOURCE_GLOB | "SourceGlob" |
public static final String | TRIM | "Trim" |
com.illumon.iris.pqimport.DownsampleImportPersistentQuery
Modifier and Type | Constant Field | Value |
---|---|---|
public static final String | AGGREGATE_COLUMNS | "AggregateColumns" |
public static final String | AGGREGATE_TYPES | "AggregateTypes" |
public static final String | ALL_BINS | "AllBins" |
public static final String | CALENDAR | "Calendar" |
public static final String | CONFIGURATION_TYPE | "DownsampleImport" |
public static final String | KEY_COLUMNS | "KeyColumns" |
public static final String | MAINTAIN_STATE_COLUMNS | "MaintainStateColumns" |
public static final String | NUM_THREADS | "NumThreads" |
public static final String | PERIOD | "Period" |
public static final String | SOURCE_NAMESPACE | "SourceNamespace" |
public static final String | SOURCE_TABLE | "SourceTable" |
public static final String | TIME_BIN_MODE | "TimeBinMode" |
public static final String | TIMESTAMP_COLUMN | "TimestampColumn" |
com.illumon.iris.pqimport.ImportPersistentQuery
Modifier and Type | Constant Field | Value |
---|---|---|
public static final String | APPEND | "Append" |
public static final String | IMPORT_SOURCE | "ImportSource" |
public static final String | INTRADAY_PARTITION_COLUMN | "IntradayPartitionColumn" |
public static final String | OUTPUT_MODE_APPEND | "Append" |
public static final String | OUTPUT_MODE_REPLACE | "Replace" |
public static final String | OUTPUT_MODE_SAFE | "Safe" |
public static final String | STRICT | "Strict" |
public static final String | TYPE_SPECIFIC_FIELDS_OUTPUT_MODE | "OutputMode" |
com.illumon.iris.pqimport.JdbcImportPersistentQuery
Modifier and Type | Constant Field | Value |
---|---|---|
public static final String | CATALOG | "catalog" |
public static final String | CONFIGURATION_TYPE | "JdbcImport" |
public static final String | DRIVER | "driver" |
public static final String | PASSWORD | "password" |
public static final String | QUERY | "query" |
public static final String | URI | "uri" |
public static final String | USER | "user" |
com.illumon.iris.pqimport.MergeImportPersistentQuery
Modifier and Type | Constant Field | Value |
---|---|---|
public static final String | PARTITION_SUBSTITUTION | "PartitionSubstitution" |
public static final String | SUBSTITUTION_DATE_FORMAT | "SubstitutionDateFormat" |
com.illumon.iris.pqimport.MergePersistentQuery
Modifier and Type | Constant Field | Value |
---|---|---|
public static final String | ALLOW_EMPTY_INPUT | "AllowEmptyInput" |
public static final String | FORCE_OPTION | "Force" |
public static final String | LOW_HEAP_USAGE | "LowHeapUsage" |
public static final String | SORT_COLUMN_FORMULA | "SortColumnFormula" |
public static final String | THREAD_POOL_SIZE | "ThreadPoolSize" |
public static final String | TYPE_SPECIFIC_FIELD_CODEC | "Codec" |
public static final String | TYPE_SPECIFIC_FIELD_FORMAT | "Format" |
com.illumon.iris.pqimport.ValidatePersistentQuery
Modifier and Type | Constant Field | Value |
---|---|---|
public static final String | DELETE_INTRADAY_DATA | "DeleteIntradayData" |
public static final String | TEST_TYPE | "TestType" |
public static final String | VALIDATOR_CLASS | "ValidatorClass" |
com.illumon.iris.pqimport.XmlImportPersistentQuery
Modifier and Type | Constant Field | Value |
---|---|---|
public static final String | CONFIGURATION_TYPE | "XmlImport" |
public static final String | CONSTANT | "Constant" |
public static final String | DELIMITER | "Delimiter" |
public static final String | ELEMENT_TYPE | "ElementType" |
public static final String | MAX_DEPTH_ELEMENT | "MaxDepth" |
public static final String | SOURCE_DIRECTORY | "SourceDirectory" |
public static final String | SOURCE_FILE | "SourceFile" |
public static final String | SOURCE_GLOB | "SourceGlob" |
public static final String | START_DEPTH_ELEMENT | "StartDepth" |
public static final String | START_INDEX_ELEMENT | "StartIndex" |
public static final String | SUBSTITUTION_DATE_FORMAT | "SubstitutionDateFormat" |
public static final String | USE_ATTRIBUTE_VALUES | "AttributeValues" |
public static final String | USE_ELEMENT_VALUES | "ElementValues" |
public static final String | USE_NAMED_VALUES | "NamedValues" |
com.illumon.util.IngesterPersistentQueryConstants (also available from io.deephaven.shadow.enterprise.com.illumon.util.IngesterPersistentQueryConstants)
Modifier and Type | Constant Field | Value |
---|---|---|
public static final String | CONFIGURATION_TYPE_MERGE | "Merge" |
public static final String | CONFIGURATION_TYPE_VALIDATE | "Validate" |
public static final String | TYPE_SPECIFIC_FIELD_NAMESPACE | "Namespace" |
public static final String | TYPE_SPECIFIC_FIELD_TABLE | "Table" |
public static final String | TYPE_SPECIFIC_FIELD_PARTITION_FORMULA | "PartitionFormula" |
Appendix B - Restart users enum and constant values
io.deephaven.proto.controller.RestartUsersEnum (Core+)
Enumeration Name | Enumeration Value |
---|---|
RU_UNSPECIFIED | 0 |
RU_ADMIN | 1 |
RU_ADMIN_AND_VIEWERS | 2 |
RU_RESERVED_1 | 3 |
RU_VIEWERS_WHEN_DOWN | 4 |
com.illumon.dataobjects.generated.DefaultPersistentQueryConfiguration (Legacy)
Modifier and Type | Constant Field | Value |
---|---|---|
public static final int | RESTARTUSERS_ADMIN | 1 |
public static final int | RESTARTUSERS_ADMINANDVIEWERS | 2 |
public static final int | RESTARTUSERS_VIEWERSWHENDOWN | 4 |