Persistent Query Infrastructure and API

The Persistent Query Controller (the controller, for short) is listed as iris_controller in monit, and runs on the controller pod in Kubernetes. It is the service responsible for managing Persistent Queries. For instance, when it is time to start a scheduled Persistent Query, it is the controller that starts the PQ and monitors its state. The Query Monitor (Web) and Query Config (Java Swing) panels obtain information about Persistent Queries in the system from the controller using the PersistentQueryControllerClient for Legacy or the PersistentQueryControllerClient for Core+.

Persistent query services and objects

The PersistentQueryControllerClient provides a method (getData() in Legacy or getDataCopy() in Core+) that returns a Map of persistent query serials and PersistentQueryInfo objects.

Native Python classes for Core+ management of persistent queries are not yet available. Currently, it is recommended to use Groovy if management of persistent queries from Core+ workers is desired.

For Legacy workers:

import com.illumon.iris.controller.PersistentQueryControllerClient;

client = PersistentQueryControllerClient.getControllerClient("Console Script",log, true);
for (PQ in client.getData().values()) {
    println (PQ.getConfig().getName());
Level = jpy.get_type('com.fishlib.io.log.LogLevel').INFO
Logger = jpy.get_type('com.fishlib.io.logger.NullLoggerImpl')
pqc = jpy.get_type('com.illumon.iris.controller.PersistentQueryControllerClient')

client = pqc.getControllerClient("Console Script",Logger(Level), True)
PQs = client.getData().values().toArray()
for PQ in PQs:
    print (PQ.getConfig().getName())

For Core+ workers:

import io.deephaven.enterprise.dnd.ControllerClientFactory;

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();
for (PQ in client.getDataCopy().values()) {
    println (PQ.getConfig().getName());
}

Each PersistentQueryInfo object represent a PQ's current state, and includes methods to get additional details about it:

  • getState() retrieves a PersistentQueryState, which provides the details about the running PQ, such as workerHost and workerPort.
  • getConfig() retrieves a PersistentQueryConfiguration, which provides the details about the PQ, such as owner, worker settings, etc.

Managing Persistent Queries programmatically

The PersistentQueryControllerClient provides methods to manage PQs on the system. See the JavaDoc for complete details on this class. There is also a native Python class in the Core+ Python client that can be used to interact with the controller.

When creating a new PQ, the serial of the PQ must be set to Java's Long.MIN_VALUE; this is the default behavior for the Legacy PQ API, but, for the newer gRPC API, the serial property defaults to 0, and must be explicitly set to Long.MIN_VALUE. It is possible to set a specific positive value for the serial of a new PQ so that, when importing a PQ from a backup or export, its serial can be maintained to match other configuration objects, such as dashboards.

When configuring some types of Peristent Queries, there are type-specific fields that need to be set; for example, the table that an import, validate, or merge PQ will operate on.

  • For Legacy workers, these properties are defined in various public constants in classes, such as MergePersistentQuery. The type-specific field properties are set through a Java Map.
  • For Core+ Workers, not all of these classes have been made available yet. The Core+ example scripts below include some definitions of these constants. A full list of constants and related classes is shown in Appendix A, below. The type-specific field properties are set or through a JSON String. For the JSON, each property name element must have a value and a data type specified. String as the data type works for all of these type-specific properties.

One other complex area of configuration is the scheduling settings.

  • In Legacy, scheduling is passed as an array of Strings.
  • In Core+, scheduling is passed as a sequence of .addScheduling() calls (one per line of scheduling information).

Here are examples of programmatically creating a Merge PQ.

For Legacy workers:

import com.illumon.iris.controller.PersistentQueryControllerClient
import java.util.Map
import java.util.HashMap
import static com.illumon.iris.pqimport.MergePersistentQuery.*
import com.illumon.iris.controller.PersistentQueryConfiguration
import static com.illumon.util.IngesterPersistentQueryConstants.*

client = PersistentQueryControllerClient.getControllerClient("Console Script",log, true)

// Create a new Merge PQ
String[] scheduling = [
                "SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily",
                "Calendar=USNYSE",
                "BusinessDays=false",
                "StartTime=07:55:00",
                "StopTime=23:55:00",
                "TimeZone=America/New_York",
                "SchedulingDisabled=true",
                "Overnight=false",
                "RepeatEnabled=false",
                "SkipIfUnsuccessful=false",
                "RestartErrorCount=0",
                "RestartErrorDelay=0",
                "RestartWhenRunning=Yes"
        ];

PersistentQueryConfiguration config = new PersistentQueryConfiguration()
config.setHeapSizeInGB(4.0)

// Set TypeSpecificField values
Map map = new HashMap()
map.put(LOW_HEAP_USAGE,"false")
map.put(FORCE_OPTION,"false")
map.put(ALLOW_EMPTY_INPUT,"true")
map.put(SORT_COLUMN_FORMULA,"")
map.put(THREAD_POOL_SIZE,"4")
map.put(TYPE_SPECIFIC_FIELD_NAMESPACE, "LearnDeephaven")
map.put(TYPE_SPECIFIC_FIELD_TABLE, "StockQuotes")
map.put(TYPE_SPECIFIC_FIELD_PARTITION_FORMULA, "\"2017-08-25\"")
map.put(TYPE_SPECIFIC_FIELD_TABLE_DATA_SERVICE_CONFIG,"local")
map.put(TYPE_SPECIFIC_FIELD_FORMAT,"Default")
config.setTypeSpecificFields(map)

config.setConfigurationType(CONFIGURATION_TYPE_MERGE)
config.setName("Test Programmatic Merge Creation")
config.setOwner("iris")
config.setEnabled(false)
config.setDbServerName("Merge_1")
config.setDetailedGCLoggingEnabled(false)
config.setDataBufferPoolToHeapSizeRatio(.3)
config.setJVMProfile("Default")
// RESTARTUSERS_ADMIN=1
// RESTARTUSERS_ADMINANDVIEWERS=2
// RESTARTUSERS_VIEWERSWHENDOWN=4
config.setRestartUsers(1)
config.setScheduling(scheduling)
// timeout in milliseconds
config.setTimeout(600000)

client.addQueryConfiguration(config)
PQCC = jpy.get_type('com.illumon.iris.controller.PersistentQueryControllerClient')
HASHMAP = jpy.get_type('java.util.HashMap')
MPQ = jpy.get_type('com.illumon.iris.pqimport.MergePersistentQuery')
PQC = jpy.get_type('com.illumon.iris.controller.PersistentQueryConfiguration')
IPQ = jpy.get_type('com.illumon.util.IngesterPersistentQueryConstants')
Level = jpy.get_type('com.fishlib.io.log.LogLevel').INFO
Logger = jpy.get_type('com.fishlib.io.logger.NullLoggerImpl')

client = PQCC.getControllerClient('Console Script',Logger(Level),True)

# Create a new Merge PQ
scheduling = []
scheduling.append('SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily')
scheduling.append('Calendar=USNYSE')
scheduling.append('BusinessDays=false')
scheduling.append('StartTime=07:55:00')
scheduling.append('StopTime=23:55:00')
scheduling.append('TimeZone=America/New_York')
scheduling.append('SchedulingDisabled=true')
scheduling.append('Overnight=false')
scheduling.append('RepeatEnabled=false')
scheduling.append('SkipIfUnsuccessful=false')
scheduling.append('RestartErrorCount=0')
scheduling.append('RestartErrorDelay=0')
scheduling.append('RestartWhenRunning=Yes')

config = PQC()
config.setHeapSizeInGB(4.0)

# Set TypeSpecificField values
map = HASHMAP()
map.put(MPQ.LOW_HEAP_USAGE,'false')
map.put(MPQ.FORCE_OPTION,'false')
map.put(MPQ.ALLOW_EMPTY_INPUT,'true')
map.put(MPQ.SORT_COLUMN_FORMULA,'')
map.put(MPQ.THREAD_POOL_SIZE,'4')
map.put(IPQ.TYPE_SPECIFIC_FIELD_NAMESPACE, 'LearnDeephaven')
map.put(IPQ.TYPE_SPECIFIC_FIELD_TABLE, 'StockQuotes')
map.put(IPQ.TYPE_SPECIFIC_FIELD_PARTITION_FORMULA, '\"2017-08-25\"')
map.put(IPQ.TYPE_SPECIFIC_FIELD_TABLE_DATA_SERVICE_CONFIG,'local')
map.put(MPQ.TYPE_SPECIFIC_FIELD_FORMAT,'Default')
config.setTypeSpecificFields(map)

config.setConfigurationType(IPQ.CONFIGURATION_TYPE_MERGE)
config.setName('Test Programmatic Merge Creation')
config.setOwner('iris')
config.setEnabled(False)
config.setDbServerName('Merge_1')
config.setDetailedGCLoggingEnabled(False)
config.setDataBufferPoolToHeapSizeRatio(.3)
config.setJVMProfile('Default')
# RESTARTUSERS_ADMIN=1
# RESTARTUSERS_ADMINANDVIEWERS=2
# RESTARTUSERS_VIEWERSWHENDOWN=4
config.setRestartUsers(1)
config.setScheduling(scheduling)
# timeout in milliseconds
config.setTimeout(600000)

client.addQueryConfiguration(config)

For Core+ workers:

import io.deephaven.enterprise.dnd.ControllerClientFactory
import static io.deephaven.shadow.enterprise.com.illumon.util.IngesterPersistentQueryConstants.*
import io.deephaven.proto.controller.PersistentQueryConfigMessage
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.deephaven.proto.controller.RestartUsersEnum

// Some TypeSpecificField constants
final String LOW_HEAP_USAGE = "LowHeapUsage";
final String FORCE_OPTION = "Force";
final String ALLOW_EMPTY_INPUT = "AllowEmptyInput";
final String SORT_COLUMN_FORMULA = "SortColumnFormula";
final String THREAD_POOL_SIZE = "ThreadPoolSize";
final String TYPE_SPECIFIC_FIELD_FORMAT = "Format";
final String TYPE_SPECIFIC_FIELD_CODEC = "Codec";

encodeJsonStringNode = { ObjectNode root, String name, String value ->
        root.putObject(name).put("type","String").put("value",value)
}

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();

// Create a new Merge PQ
String[] scheduling = [
                "SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily",
                "Calendar=USNYSE",
                "BusinessDays=false",
                "StartTime=07:55:00",
                "StopTime=23:55:00",
                "TimeZone=America/New_York",
                "SchedulingDisabled=true",
                "Overnight=false",
                "RepeatEnabled=false",
                "SkipIfUnsuccessful=false",
                "RestartErrorCount=0",
                "RestartErrorDelay=0",
                "RestartWhenRunning=Yes"
        ];

configBuilder = PersistentQueryConfigMessage.newBuilder()
configBuilder.setHeapSizeGb(4.0)

ObjectNode rootNode = mapper.createObjectNode();

// Set TypeSpecificField values
encodeJsonStringNode(rootNode,LOW_HEAP_USAGE,"false")
encodeJsonStringNode(rootNode,FORCE_OPTION,"false")
encodeJsonStringNode(rootNode,ALLOW_EMPTY_INPUT,"true")
encodeJsonStringNode(rootNode,SORT_COLUMN_FORMULA,"")
encodeJsonStringNode(rootNode,THREAD_POOL_SIZE,"4")
encodeJsonStringNode(rootNode,TYPE_SPECIFIC_FIELD_NAMESPACE, "LearnDeephaven")
encodeJsonStringNode(rootNode,TYPE_SPECIFIC_FIELD_TABLE, "StockQuotes")
encodeJsonStringNode(rootNode,TYPE_SPECIFIC_FIELD_PARTITION_FORMULA, "\"2017-08-25\"")
encodeJsonStringNode(rootNode,TYPE_SPECIFIC_FIELD_TABLE_DATA_SERVICE_CONFIG,"local")
encodeJsonStringNode(rootNode,TYPE_SPECIFIC_FIELD_FORMAT,"Default")

configBuilder.setTypeSpecificFieldsJson(new ObjectMapper().writeValueAsString(rootNode))

// set Long.MIN_VALUE for serial to create a new serial for this PQ
configBuilder.setSerial(Long.MIN_VALUE)
configBuilder.setConfigurationType(CONFIGURATION_TYPE_MERGE)
configBuilder.setName("Test Programmatic Merge Creation")
configBuilder.setOwner("iris")
configBuilder.setEnabled(false)
configBuilder.setServerName("Merge_1")
configBuilder.setDetailedGCLoggingEnabled(false)
configBuilder.setBufferPoolToHeapRatio(.3)
configBuilder.setJvmProfile("Default")
configBuilder.setRestartUsers(RestartUsersEnum.RU_ADMIN)

// add all scheduling property lines from scheduling String[]
for (int i=0; i<scheduling.size(); i++) {
        configBuilder.addScheduling(scheduling[i])
}

// timeout in nanoseconds
configBuilder.setTimeoutNanos(600000000000)

configMessage=configBuilder.build()
client.addQuery(configMessage)
from deephaven_enterprise.client.session_manager import SessionManager
import json
from typing import Dict

def __encode_type_specific_fields(tsf: Dict) -> str:
    """
    Encodes type specific fields from a Python dictionary into the JSON object that the controller expects.
    :param tsf: a Python dictionary with type specific fields
    :return: a JSON encoded string suitable for the controller
    """
    encoded = {}
    for (k, v) in tsf.items():
        encoded[k] = {"type": "string", "value": v}

    return json.dumps(encoded)

# authenticate to the Deephaven host
connection_info = "https://deephaven-host:8000/iris/connection.json"

session_mgr: SessionManager = SessionManager(connection_info)
session_mgr.private_key("/path-to-private-key/priv-username.base64.txt")

# Create the query configuration
config: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage =
    deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage()

tsf = {
    "LowHeapUsage": "false",
    "Force": "false",
    "AllowEmptyInput": "true",
    "SortColumnFormula": "",
    "ThreadPoolSize": "4",
    "Namespace": "LearnDeephaven",
    "Table": "StockQuotes",
    "PartitionFormula": "\"2017-08-25\"",
    "TableDataServiceConfig": "local",
    "Format": "Default"
}

config.typeSpecificFieldsJson = __encode_type_specific_fields(tsf)

# Java Long.MIN_VALUE allows the controller to assign a serial number
config.serial = -2 ** 63
config.configurationType = "Merge"
config.name = "Test Programmatic Merge Creation"
config.version = 1
config.owner = "iris"
config.enabled = False
config.heapSizeGb = 1.0
config.serverName = "Merge_1"
config.detailedGCLoggingEnabled = False
config.jvmProfile = "Default"
config.bufferPoolToHeapRatio = 0.3
config.restartUsers = deephaven_enterprise.proto.persistent_query_pb2.RU_ADMIN

scheduling = []
scheduling.append("SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily")
scheduling.append("Calendar=USNYSE")
scheduling.append("BusinessDays=false")
scheduling.append("StartTime=07:55:00")
scheduling.append("StopTime=23:55:00")
scheduling.append("TimeZone=America/New_York")
scheduling.append("SchedulingDisabled=true")
scheduling.append("Overnight=false")
scheduling.append("RepeatEnabled=false")
scheduling.append("SkipIfUnsuccessful=false")
scheduling.append("RestartErrorCount=0")
scheduling.append("RestartErrorDelay=0")
scheduling.append("RestartWhenRunning=Yes")

for param in scheduling:
    config.scheduling.append(param)

initialize_timeout_seconds: float = 60.0
config.timeoutNanos = int(initialize_timeout_seconds * 1_000_000_000)

# Add the query
session_mgr.controller_client.add_query(config)

Appendix A - Constant values for Persistent Query configuration types and type-specific fields

Note

Among the classes listed below, only io.deephaven.shadow.enterprise.com.illumon.util.IngesterPersistentQueryConstants is available to Core+ workers. The other classes listed here are available to Legacy workers directly, or through jpy.get_type. For Core+ workers, use the values documented here when setting configuration type or type-specific fields when creating or updating a Persistent Query.

com.illumon.iris.pqimport.BinaryImportPersistentQuery

Modifier and TypeConstant FieldValue
public static final StringCONFIGURATION_TYPE"BinaryImport"
public static final StringTYPE_SPECIFIC_FIELDS_SOURCE_DIRECTORY"SourceDirectory"
public static final StringTYPE_SPECIFIC_FIELDS_SOURCE_FILE"SourceFile"
public static final StringTYPE_SPECIFIC_FIELDS_SOURCE_GLOB"SourceGlob"

com.illumon.iris.pqimport.CsvImportPersistentQuery

Modifier and TypeConstant FieldValue
public static final StringCONFIGURATION_TYPE"CsvImport"
public static final StringCONSTANT"Constant"
public static final StringDELIMITER"Delimiter"
public static final StringFILEFORMAT"Fileformat"
public static final StringNO_HEADER"NoHeader"
public static final StringSKIP_FOOTER_LINES"SkipFooterLines"
public static final StringSKIPLINES"Skiplines"
public static final StringSOURCE_DIRECTORY"SourceDirectory"
public static final StringSOURCE_FILE"SourceFile"
public static final StringSOURCE_GLOB"SourceGlob"
public static final StringTRIM"Trim"

com.illumon.iris.pqimport.DownsampleImportPersistentQuery

Modifier and TypeConstant FieldValue
public static final StringAGGREGATE_COLUMNS"AggregateColumns"
public static final StringAGGREGATE_TYPES"AggregateTypes"
public static final StringALL_BINS"AllBins"
public static final StringCALENDAR"Calendar"
public static final StringCONFIGURATION_TYPE"DownsampleImport"
public static final StringKEY_COLUMNS"KeyColumns"
public static final StringMAINTAIN_STATE_COLUMNS"MaintainStateColumns"
public static final StringNUM_THREADS"NumThreads"
public static final StringPERIOD"Period"
public static final StringSOURCE_NAMESPACE"SourceNamespace"
public static final StringSOURCE_TABLE"SourceTable"
public static final StringTIME_BIN_MODE"TimeBinMode"
public static final StringTIMESTAMP_COLUMN"TimestampColumn"

com.illumon.iris.pqimport.ImportPersistentQuery

Modifier and TypeConstant FieldValue
public static final StringAPPEND"Append"
public static final StringIMPORT_SOURCE"ImportSource"
public static final StringINTRADAY_PARTITION_COLUMN"IntradayPartitionColumn"
public static final StringOUTPUT_MODE_APPEND"Append"
public static final StringOUTPUT_MODE_REPLACE"Replace"
public static final StringOUTPUT_MODE_SAFE"Safe"
public static final StringSTRICT"Strict"
public static final StringTYPE_SPECIFIC_FIELDS_OUTPUT_MODE"OutputMode"

com.illumon.iris.pqimport.JdbcImportPersistentQuery

Modifier and TypeConstant FieldValue
public static final StringCATALOG"catalog"
public static final StringCONFIGURATION_TYPE"JdbcImport"
public static final StringDRIVER"driver"
public static final StringPASSWORD"password"
public static final StringQUERY"query"
public static final StringURI"uri"
public static final StringUSER"user"

com.illumon.iris.pqimport.MergeImportPersistentQuery

Modifier and TypeConstant FieldValue
public static final StringPARTITION_SUBSTITUTION"PartitionSubstitution"
public static final StringSUBSTITUTION_DATE_FORMAT"SubstitutionDateFormat"

com.illumon.iris.pqimport.MergePersistentQuery

Modifier and TypeConstant FieldValue
public static final StringALLOW_EMPTY_INPUT"AllowEmptyInput"
public static final StringFORCE_OPTION"Force"
public static final StringLOW_HEAP_USAGE"LowHeapUsage"
public static final StringSORT_COLUMN_FORMULA"SortColumnFormula"
public static final StringTHREAD_POOL_SIZE"ThreadPoolSize"
public static final StringTYPE_SPECIFIC_FIELD_CODEC"Codec"
public static final StringTYPE_SPECIFIC_FIELD_FORMAT"Format"

com.illumon.iris.pqimport.ValidatePersistentQuery

Modifier and TypeConstant FieldValue
public static final StringDELETE_INTRADAY_DATA"DeleteIntradayData"
public static final StringTEST_TYPE"TestType"
public static final StringVALIDATOR_CLASS"ValidatorClass"

com.illumon.iris.pqimport.XmlImportPersistentQuery

Modifier and TypeConstant FieldValue
public static final StringCONFIGURATION_TYPE"XmlImport"
public static final StringCONSTANT"Constant"
public static final StringDELIMITER"Delimiter"
public static final StringELEMENT_TYPE"ElementType"
public static final StringMAX_DEPTH_ELEMENT"MaxDepth"
public static final StringSOURCE_DIRECTORY"SourceDirectory"
public static final StringSOURCE_FILE"SourceFile"
public static final StringSOURCE_GLOB"SourceGlob"
public static final StringSTART_DEPTH_ELEMENT"StartDepth"
public static final StringSTART_INDEX_ELEMENT"StartIndex"
public static final StringSUBSTITUTION_DATE_FORMAT"SubstitutionDateFormat"
public static final StringUSE_ATTRIBUTE_VALUES"AttributeValues"
public static final StringUSE_ELEMENT_VALUES"ElementValues"
public static final StringUSE_NAMED_VALUES"NamedValues"

com.illumon.util.IngesterPersistentQueryConstants (also available from io.deephaven.shadow.enterprise.com.illumon.util.IngesterPersistentQueryConstants)

Modifier and TypeConstant FieldValue
public static final StringCONFIGURATION_TYPE_MERGE"Merge"
public static final StringCONFIGURATION_TYPE_VALIDATE"Validate"
public static final StringTYPE_SPECIFIC_FIELD_NAMESPACE"Namespace"
public static final StringTYPE_SPECIFIC_FIELD_TABLE"Table"
public static final StringTYPE_SPECIFIC_FIELD_PARTITION_FORMULA"PartitionFormula"

Appendix B - Restart users enum and constant values

io.deephaven.proto.controller.RestartUsersEnum (Core+)

Enumeration NameEnumeration Value
RU_UNSPECIFIED0
RU_ADMIN1
RU_ADMIN_AND_VIEWERS2
RU_RESERVED_13
RU_VIEWERS_WHEN_DOWN4

com.illumon.dataobjects.generated.DefaultPersistentQueryConfiguration (Legacy)

Modifier and TypeConstant FieldValue
public static final intRESTARTUSERS_ADMIN1
public static final intRESTARTUSERS_ADMINANDVIEWERS2
public static final intRESTARTUSERS_VIEWERSWHENDOWN4