Monitoring Queries

This guide covers best practices for monitoring Persistent Queries (PQs) in Deephaven. Regular monitoring of queries is critical to identifying, diagnosing, and resolving issues related to performance, resource usage, correctness, and stability in a timely fashion. The methods presented combine the use of Deephaven's internal tables and dashboards.

Deephaven's internal tables contain information about the state of the system, including query status. These tables can be queried individually or together to diagnose slow and/or unresponsive queries. Combining these tables with additional widgets and graphical tools helps continuous monitoring efforts and provides a comprehensive view of the system's health.

The techniques presented are intended to help you troubleshoot queries independently. However, some issues may require assistance from Deephaven support. When serious, unexpected, or esoteric issues arise, it's recommended to send Console logs, Exception messages, and any other pertinent information to the support team.

Internal tables

All Deephaven installations include many tables that contain specific information for the system's internal use. These tables are updated by various processes. They contain details about processes, workers, PQ states, and all query operations. These tables are stored in the DbInternal namespace.

Every internal table contains a ProcessInfoId column with a unique identifier for a particular worker and can be used to correlate values across multiple tables.

Query Operation Performance Log

The Query Operation Performance Log (QOPL) contains information about performance details on query operations such as duration and memory usage. This table can be queried in a Core+ worker with the following code:

qopl = db.liveTable("DbInternal", "QueryOperationPerformanceLogCoreV2").where("Date=today()")
qopl = db.live_table("DbInternal", "QueryOperationPerformanceLogCoreV2").where(
    "Date=today()"
)

See Query Operation Performance Log for an explanation of each column in the table.

Update Performance Log

The Update Performance Log describes the time and resources used by the Update Graph. It contains aggregated performance details on incremental update operations performed in update graphs. This table can be queried in a Core+ worker with the following code:

upl = db.liveTable("DbInternal", "UpdatePerformanceLogCoreV2").where("Date=today()")
upl = db.live_table("DbInternal", "UpdatePerformanceLogCoreV2").where("Date=today()")

See Update Performance Log for an explanation of each column in the table.

Persistent Query State Log

The Persistent Query State Log contains information about the state of PQs. Any time a worker's state changes, whether it be from starting, stopping, or anything else, that information gets logged to this table. It can be queried in a Core+ worker with the following code:

pqsl = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()")
pqsl = db.live_table("DbInternal", "PersistentQueryStateLog").where("Date=today()")

Statuses of queries that have not failed in this table can be one of:

  • Connected
  • Acquiring Worker
  • Initializing
  • Completed
  • Running
  • Stopped

Statuses of queries that have failed in this table can be one of:

See Persistent Query State Log for an explanation of each column in the table.

Failed

This means that the PQ failed before or during initialization. Refer to the ExceptionMessage column in the Persistent Query State Log for specific details.

  • The most common failure is an error in a script. Expanding the cell shows the full text of an error message. As an example, leaving a closing quotation mark off a string will result in a ScriptException and the line number of the error.
  • Queries fail if the query exceeds the query server's maximum heap size. In cases where this happens, increase the memory (heap) usage value in the Query Monitor.

Error

This means that an error occurred after the query was initialized (e.g., while processing incremental updates). The ExceptionStackTrace column may offer more information in this case. To review this information, expand the relevant cell to open a window with the full text, e.g. by right-clicking the entry and selecting View Cell Contents.

Disconnected

This means the worker process disconnected from the dispatcher. Query workers typically disconnect from the dispatcher as a result of an exceptional error or excessive workload that prevents the worker from sending a heartbeat. Common reasons for a disconnection include:

  • The server runs out of heap and an OutOfMemoryError notification occurs, thus killing the JVM.
  • The JVM is stuck in Garbage Collection so long that it is unable to respond to dispatcher heartbeats for 60 seconds.
  • A deployment issue results in a NoSuchMethodError.
  • An error occurred in internal JVM or a native code error occurred (aka, a JVM Hotspot Error).

The following query shows all the times a PQ entered a failure state on the current date:

pqslAllFailedStates = db.liveTable("DbInternal", "PersistentQueryStateLog")
        .where("Date=today()", "Status in `Failed`, `Error`, `Disconnected`")
pqs_all_failed_states = db.live_table("DbInternal", "PersistentQueryStateLog").where(
    ["Date=today()", "Status in `Failed`, `Error`, `Disconnected`"]
)

Getting the last row by serial number shows the most recent state of any PQ that attempted to start on the current date:

pqslAllCurrentStates = db.liveTable("DbInternal", "PersistentQueryStateLog")
        .where("Date=today()").lastBy("SerialNumber")
pqs_all_current_states = (
    db.live_table("DbInternal", "PersistentQueryStateLog")
    .where("Date=today()")
    .last_by("SerialNumber")
)

Combining the above queries indicates which PQs are currently in a failure state:

pqslCurrentFailedStates = db.liveTable("DbInternal", "PersistentQueryStateLog")
    .where("Date=today()")
    .lastBy("SerialNumber")
    .where("Status in `Failed`, `Error`, `Disconnected`")
pqs_current_failed_states = (
    db.live_table("DbInternal", "PersistentQueryStateLog")
    .where("Date=today()")
    .last_by("SerialNumber")
    .where(["Status in `Failed`, `Error`, `Disconnected`"])
)

Alternatively, the Query Monitor provides details about the PQs that you are authorized to see. Information about each query is stored in columns, including the owner, query name, whether it's enabled, and its status. If a PQ has failed, the ExceptionDetails column includes the same raw data as the Persistent Query State Log, which may contain pertinent information.

To read the ExceptionDetails, expand the relevant cell. This opens a window that displays the full content. Alternatively, you can click the row of the relevant PQ, which presents its ExceptionDetails in its configuration editor summary.

Process Event Log

The Process Event Log contains all log messages for Deephaven workers and query/merge servers. These log messages include the output from workers. It can be queried in a Core+ worker with the following code:

pel = db.liveTable("DbInternal", "ProcessEventLog").where("Date=today()")
pel = db.live_table("DbInternal", "ProcessEventLog").where("Date=today()")

A common usage pattern for this table is to find all FATAL level messages for the current day:

pel = db.liveTable("DbInternal", "ProcessEventLog").where("Date=today()", "Level=`FATAL`")
pel = db.live_table("DbInternal", "ProcessEventLog").where(
    ["Date=today()", "Level=`FATAL`"]
)

You can also find specific error messages by using the Process Info ID to query the log. This information can be found by hovering over the Info button in the console:

Info button

Expand the cell for your query in the Log Entry column to view a full exception message and stack trace, or right-click the cell and select View Cell Contents to open the text in its own dialog window.

To search the LogEntry column, right click it and select Go To. Alternatively, the keyboard shortcut cmd+G (Mac) or ctrl+G (Windows) opens a search box.

Go To

See Process Event Log for an explanation of each column in the table.

Monitor queries with internal tables

The following sections describe how to use the internal tables described above to monitor queries.

Query Operation Performance Log and Update Performance Log

The Description or EntryDescription columns of these two tables includes human-readable information that describes the type of the operation. The image below shows a snippet of the Description column from the Query Operation Performance Log, where you can see logs related to PQs that open the StockTrades and StockQuotes tables from the LearnDeephaven namespace:

Description column

The majority of entries in this column indicate each operation performed on a table (e.g., join, sort, where, etc.). Looking at other columns in a given row (StartTime, EndTime, WasInterrupted, etc.) may help determine which specific operations are slow and offer hints as to why. While none of these log descriptions are cause for concern on their own, any action that takes a long time could contribute to performance issues.

  • Aggregated Small Updates (Update Performance Log only) — A summary description that indicates a group of updates, where each of which is less than the configurable threshold for logging an update. For many workloads, the vast majority of updates will not materially affect performance. Aggregating the updates saves significant disk space and bandwidth. The threshold for update logging may be configured with the QueryPerformance.minimumUpdateLogDurationNanos property. See Filtering Small QOPLCV2 and UPLCV2 Entries below.
  • coalesce() — Describes producing a QueryTable that can be used for query operations from an uncoalesced table (typically a SourceTable). The SourceTable is a placeholder that represents your data and defers most work until a query operation is required, at which point it must be coalesced.
  • initializeAvailableLocations() and initializeLocationSizes() — These actions require file system access to read directories (locations) and then many table.size files. This can have a substantial impact on a historical query.
  • Uninstrumented Code — This is code that is not enclosed in a measurement block, which means it cannot be monitored or traced. It could indicate a bug, or a complex operation that, because it is outside of our framework, has no details available.

Filter small entries

The Update Performance Log is extremely comprehensive. It contains many small entries (Aggregated Small Updates) that are not always of interest when debugging performance issues. You can set a minimum threshold to filter small entries, which frees up disk space and makes it easier to analyze the data.

Note

These properties are set in the Core properties configuration, not iris-environment.prop.

  • QueryPerformance.minimumUpdateLogDurationNanos=<nanoseconds> — sets a minimum duration in nanoseconds for recorded entries in the Update Performance Log. For example, setting this property to =1000000 discards entries in the UpdatePerformanceLogCoreV2 with a duration less than a millisecond. When set to 0, all values are logged.
  • QueryPerformance.minimumLogDurationNanos — sets a minimum duration in nanoseconds for recorded entries in the QueryOperationPerformanceLogCoreV2. When set to 0, all values are logged.
  • QueryPerformance.minimumUninstrumentedLogDurationNanos — sets a minimum duration in nanoseconds of uninstrumented code to be recorded in the Query Operation Performance Log. When set to 0, all values are logged.

The following query creates three tables with specific totals and usage information to see just how many small entries exist in your system:

smallEntries = db.liveTable("DbInternal", "UpdatePerformanceLogCoreV2")
        .where("Date=today()")
        .updateView("Bucket=Long.highestOneBit(UsageNanos)")
        .view("Count=1", "Bucket", "UsageNanos")
        .sumBy("Bucket")
        .sort("Bucket")
        .groupBy()
        .updateView("CumCount=cumsum(Count)", "CumUsage=cumsum(UsageNanos)")
        .ungroup()
total = smallEntries
        .view("Total=Count", "TotalUsage=UsageNanos")
        .sumBy()
fraction = smallEntries
        .naturalJoin(total, "")
        .updateView("FracEntries=CumCount/Total", "FracTime=CumUsage/TotalUsage")
small_entries = (
    db.live_table("DbInternal", "UpdatePerformanceLogCoreV2")
    .where("Date=today()")
    .update_view("Bucket=Long.highestOneBit(UsageNanos)")
    .view(["Count=1", "Bucket", "UsageNanos"])
    .sum_by("Bucket")
    .sort("Bucket")
    .group_by()
    .update_view(["CumCount=cumsum(Count)", "CumUsage=cumsum(UsageNanos)"])
    .ungroup()
)
total = small_entries.view(["Total=Count", "TotalUsage=UsageNanos"]).sum_by()
fraction = small_entries.natural_join(total, "").update_view(
    ["FracEntries=CumCount/Total", "FracTime=CumUsage/TotalUsage"]
)

Here's what the first table looks like:

Small entries

The total table shows a ticking total of the number of entries and their usage:

Total

The fraction table contains the fraction of entries and time for each bucket of usage:

Fraction

Performance overview

Deephaven enables you to get a performance overview for a query by its Process Info ID, worker name, or PQ name. The following code block does all three:

// Get perf overview by PID
performanceOverviewByPiid("52e806dd-af75-412c-a286-ec29aa5571d2")

// Get perf overview by worker name
performanceOverviewByWorkerName("worker_12")

// Get perf overview by PQ name
performanceOverviewByPqName("PqName")
# Get perf overview by PID
performance_overview("52e806dd-af75-412c-a286-ec29aa5571d2")

# Get perf overview by worker name
performance_overview(worker_name="worker_12")

# Get perf overview by PQ name
performance_overview(pq_name="PqName")

If the same PQ ran multiple times during the day, provide a date-time string representing an as-of time, which will narrow the search to the latest PQ run at or before the specified time:

performanceOverviewByPqName("PqName", "2023-04-28T15:57:45 ET")
performance_overview(pq_name="PqName", as_of_time_string="2023-04-28T15:57:45 ET")

In Python, performance_overview uses named arguments to specify the function's behavior. The following parameters may be used:

  • process_info_id: The Process Info ID of the worker to get performance information for.
  • worker_name: The name of the worker to get performance information for.
  • pq_name: The name of the PQ to get performance information for.
  • host_name: The name of the host running the worker.
  • as_of_time: A date-time string representing an as-of time.
  • as_of_time_string: A string representation of a date-time representing an as-of time.
  • owner: The PQ owner. This is only relevant if pq_name is used.
  • date: The date to get information for. If not given, the current date is used.
  • is_intraday: Determines if the table retrieval is intraday or historical
  • is_live: Determines whether an intraday table is live or static. This is only relevant if is_intraday is used.

In Groovy, performance overview is split into three functions.

  • For Process Info ID:
performanceOverviewByPiid = { final String processInfoId, final String date = null, final boolean isIntraday = true, final boolean isLive = true -> ...
  • For worker name:
performanceOverviewByWorkerName = { final String workerName, final String hostName = null, final String date = null, final boolean isIntraday = true, final boolean isLive = true -> ...
  • For PQ name:
performanceOverviewByPqName = { final String pqName, final String asOfTimeString = null, final String owner = null, final String date = null, final boolean isIntraday = true, final boolean isLive = true -> ...

These methods open several tables and two plots (if applicable):

Tables:

  1. QueryPerformance
  2. QueryOperationPerformance
  3. QueryUpdatePerformance
  4. UpdateWorst
  5. UpdateMostRecent
  6. UpdateAggregate
  7. UpdateSummaryStats
  8. ProcessEventLog
  9. ServerState

Plots:

  1. UGPCycleTimeline
  2. ServerStateTimeLine

The QueryOperationPerformance table, in particular the DurationNanos column, shows you the longest steps of your query's initialization process. For queries using intraday data, initialization is only part of the picture. Queries using live data also have to update results as new data flows in. If you have an intraday query that has trouble keeping up with real-time data, you should look at the UpdateWorst table.

In the UpdateWorst table, the column to focus on is Ratio. The Ratio value shows what percentage of a query's data refresh cycle is consumed by each operation. If the sum of the Ratio column for one update interval approaches N, where N is the amount of update threads, then the query will be unable to keep up with incoming data. The amount of update threads is determined by the property PeriodicUpdateGraph.updateThreads. By default, it will be equal to the number of available processors. As using threads for updates respects dependencies, analysis may be more difficult, as the system might not be able to reach the total ratio of N due to unsatisfied dependencies or long chains of serial operations. The sum of the Ratio column for each interval, as a percentage, is available in the UpdateAggregate table.

You should also carefully monitor the IntervalDurationNanos value to ensure it does not exceed UpdatePerformanceTracker.reportIntervalMillis (defaults to 60 seconds) plus PeriodicUpdateGraph.targetCycleDurationMillis (defaults to 1 second). If it does, it's a sign that performance issues are causing longer target cycle times than expected.

Below is a description of each table that the performance overview methods produce, broken up by category.

Query initialization

QueryPerformance

This table contains top-level performance information for each query executed by the given worker. It contains data on how long each query takes to run.

A query, in this context, is the code that is run:

  • Whenever you type a new command in the console and type Return. Each 'return' where you have to wait for an answer from the worker is a new query.
  • When a Persistent Query initializes.
  • As a result of a sort or filter in the GUI.

The most significant columns in this table are:

  • TimeSecs — How long this query ran for, in seconds.
  • QueryMemUsed — Memory in use by the query (only includes active heap memory).
  • QueryMemUsedPct — Memory usage as a percentage of the max heap size (-Xmx).
  • QueryMemFree — Remaining memory until the query runs into the max heap size.

Note

The sum of all workers running on the server at once can never exceed the maximum total heap, and a query server will refuse to start a new worker if doing so would exceed the maximum total heap size.

QueryOperationPerformance

This table contains performance information for each operation run by a query. Every call to the standard table operations (e.g. select, update, view, etc.) constitutes a distinct operation.

The most significant columns in this table are:

  • StartTime — The time at which this operation started.
  • EndTime — The time at which this operation ended.
  • OperationNumber — Monotonically increasing sequence numbers for each operation of a query.
  • TimeSecs — The time (in seconds) that the operation took.
  • NetMemoryChange — The change in memory usage while this operation was occurring. Memory usage is affected by factors beside the currently running operation; as a result, NetMemoryChange is only an approximation of the memory impact of each operation. NetMemoryChange = TotalMemoryChange - FreeMemoryChange, where "total memory" is memory reserved by the JVM for heap, and "free memory" is how much has not been used yet. The NetMemoryChange value will be negative when memory allocation is high, and its value will be positive when there is a large amount of GC. Sorting on NetMemoryChange should provide a useful overview of which operations are taking the most memory.

Real-Time Updates

For queries operating on ticking data (e.g., intraday data or Input Tables), there are additional logs detailing the time consumed when updating tables with the new data.

QueryUpdatePerformance

This table describes what the query spent time on during its data refresh cycle. This data is written in predefined intervals; the default is one minute, but it can be configured per query. At the end of each performance monitoring interval, a query logs every operation whose results were updated, and how much time it spent performing that update throughout the interval.

The most significant columns are:

  • IntervalStartTime — The start time of the interval.
  • IntervalEndTime — The end time of the interval.
  • IntervalDurationNanos — The duration of the performance monitoring interval, in nanoseconds.
  • Ratio — This is the percentage of the performance monitoring interval that was consumed updating each operation. This is an approximation of how much of the available CPU was used by a given operation.
  • QueryMemoryUsed — The total amount of memory used by the worker.
  • QueryMemUsedPct — Memory usage as a percentage of the max heap size (-Xmx).
  • QueryMemoryFree — The total amount of free memory remaining. This is the difference of WorkerHeapSize and QueryMemoryUsed. If this approaches zero, a query is likely to experience poor performance or crash due to an OutOfMemoryError.
  • WorkerHeapSize — The maximum heap size for the query worker.
  • NRows — Total number of changed rows.
  • RowsPerSec — Average rate data is ticking at.
  • RowsPerCPUSec — Approximation of how fast CPU handles row changes.
UpdateWorst

This table is the QueryUpdatePerformance table sorted to show the slowest-updating operations, out of all intervals since the query initialized, at the beginning of the table ( i.e., the operations with the greatest Ratio).

UpdateMostRecent

This table is the QueryUpdatePerformance table sorted to show the most recent updates first. Operations are still sorted with the greatest Ratio at the beginning of the table.

UpdateAggregate

This table shows the QueryUpdatePerformance table aggregated for each performance recording interval. The Ratio in this table represents the total CPU usage of each individual operation displayed in the other tables. If the Ratio in this table regularly approaches N (the amount of update threads - defaults to the amount of available processors), it indicates that the query may be unable to process all data updates within the target cycle time (the set target time for one UG cycle to complete). This can result in delayed data updates in the UI.

UpdateSummaryStats

This table takes the data in UpdateAggregate and aggregates it into a single row view of the 99th, 90th, 75th, and 50th percentiles of Ratio and QueryMemUsedPct over all intervals, which makes the spread of resource consumption easier to view.

Other Tables

ProcessEventLog

This table is the Process Event Log for the specified worker.

ServerState

This table aims to show details about memory usage, Garbage Collection (GC) and Update Graph (UG) cycle count, durations, and ratios.

Garbage collection is a memory management process that automatically collects unused objects created on the heap and deletes them to free up memory.

The UG is the part of the query engine that handles real-time data updates. It runs in cycles (once per second, by default). When UG cycle logging is enabled, the UG will write the cycle's length to the logs (e.g. Update Graph Processor cycleTime: 472ms) every time it completes a cycle.

Plots

UGPCycleTimeline

The UGPCycleTimeline widget plots UG cycle times from the ServerState table:

UG Cycle Timeline

ServerStateTimeLine

The ServerStateTimeLine widget plots UG ratio and memory usage from the ServerState table.

Server State Timeline

Performance queries

Deephaven also supports a tree table view of query performance:

qoptt = PerformanceQueries.queryPerformanceAsTreeTable()
qptt = PerformanceQueries.queryOperationPerformanceAsTreeTable()

Dashboards

This section provides two different examples that construct dashboards containing one or more internal tables, plots, and other widgets that can be used to monitor query performance.

Monitor PQ memory and initialization times

This example creates a dashboard that includes graphs of initialization times as well as heap usage over the last seven days of a specific PQ, then graphs the heap usage throughout the day of a single run.

Code
import  io.deephaven.time.calendar.Calendars
import  io.deephaven.time.calendar.BusinessCalendar
import io.deephaven.enterprise.database.Database
import io.deephaven.engine.table.Table
import io.deephaven.engine.util.TableTools
import io.deephaven.time.DateTimeUtils

db = (Database) db
final BusinessCalendar cal = Calendars.calendar()
date = DateTimeUtils.today()
previousDate = cal.minusBusinessDays(date, 1)
oldestDate = cal.minusBusinessDays(date, 7)
bytesPerGB = 1024*1024*1024


Table PQC = TableTools.merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2")
        .lastBy("Name")
        .where("EventType != `REMOVED`"),
        db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")
                .lastBy("Name")
                .where("EventType != `REMOVED`")).lastBy("Name")

Table basePQs = db.liveTable("DbInternal", "PersistentQueryStateLog")
        .where("Date=date.toString()")
        .whereIn(PQC, "Name")
        .view("Name", "Owner", "Status", "ServerHost", "DispatcherHost", "WorkerName", "ProcessInfoId")
        .lastBy("Name")

Table heapUsage = merge(
        db.liveTable("DbInternal", "ProcessMetricsLogCoreV2")
                .where("Date >=oldestDate")
                .where("Name = `Memory-Heap.Used`")
                .lastBy("ProcessInfoId")
                .updateView("MaxHeapUsedInGB=Max/bytesPerGB"),
        db.historicalTable("DbInternal", "ProcessMetricsLogCoreV2")
                .where("Date >=oldestDate")
                .where("Name = `Memory-Heap.Used`")
                .lastBy("ProcessInfoId")
                .updateView("MaxHeapUsedInGB=Max/bytesPerGB"))
        .lastBy("ProcessInfoId")

PQs = basePQs
        .naturalJoin(PQC, "Name", "Enabled,HeapSizeInGB,DataBufferPoolToHeapSizeRatio")
        .naturalJoin(heapUsage, "ProcessInfoId=ProcessInfoId", "MaxHeapUsedInGB")
        .sort("Name")

Table nowTable=timeTable("PT00:00:01").renameColumns("Now=Timestamp")

Table runningPQHistory1 = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date >= `2023-03-07`", "ProcessInfoId != null")
        .view("Date", "Owner", "Name", "Timestamp", "Status", "DispatcherHost", "WorkerName", "ProcessInfoId", "SerialNumber", "VersionNumber")
        .aggBy([AggMin("Name", "DispatcherHost", "WorkerName", "SerialNumber", "VersionNumber", "Start=Timestamp"),AggMax("Timestamp"),AggLast("Status")],"ProcessInfoId")

Table runningPQHistory2=nowTable.snapshotWhen(runningPQHistory1)
        .updateView("End=(Status==`Running` ? now() : Timestamp)",
                "Duration_seconds=(End - Start)/1000000000", "Duration_minutes=Duration_seconds/60")
        .dropColumns("Now", "Timestamp")

PQHistory = TableTools.merge(db.historicalTable("DbInternal", "PersistentQueryStateLog").where("Date >= oldestDate", "ProcessInfoId != null")
        .view("Date", "Owner", "Name", "Timestamp", "Status", "DispatcherHost", "WorkerName", "ProcessInfoId", "SerialNumber", "VersionNumber")
        .aggBy([AggMin("Name", "DispatcherHost", "WorkerName", "SerialNumber", "VersionNumber", "Start=Timestamp"),AggMax("End=Timestamp"),AggLast("Status")],"ProcessInfoId")
        .updateView("Duration_seconds=(End - Start)/1000000000", "Duration_minutes=Duration_seconds/60"),
        runningPQHistory2)
        .naturalJoin(heapUsage, "ProcessInfoId", "MaxHeapUsedInGB")
        .moveColumnsUp("Start", "End", "Name")
        .sortDescending("Start")

Table heapOverTime = TableTools.merge(
        db.liveTable("DbInternal", "ProcessMetricsLogCoreV2")
                .where("Date >=oldestDate")
                .where("Name = `Memory-Heap.Used`")
                .view("Timestamp", "ProcessInfoId", "HeapUsed=Last"),
        db.historicalTable("DbInternal", "ProcessMetricsLogCoreV2")
                .where("Date >=oldestDate")
                .where("Name = `Memory-Heap.Used`")
                .view("Timestamp", "ProcessInfoId", "HeapUsed=Last"))
        .lastBy("Timestamp", "ProcessInfoId")
        .updateView("HeapGB=HeapUsed/bytesPerGB")

Table nonHeapOverTime = TableTools.merge(
        db.liveTable("DbInternal", "ProcessMetricsLogCoreV2")
                .where("Date >=oldestDate")
                .where("Name = `Memory-NonHeap.Used`")
                .view("Timestamp", "ProcessInfoId", "HeapUsed=Last"),
        db.historicalTable("DbInternal", "ProcessMetricsLogCoreV2")
                .where("Date >=oldestDate")
                .where("Name = `Memory-NonHeap.Used`")
                .view("Timestamp", "ProcessInfoId", "HeapUsed=Last"))
        .lastBy("Timestamp", "ProcessInfoId")
        .updateView("HeapGB=HeapUsed/bytesPerGB")

Table directOverTime = TableTools.merge(
        db.liveTable("DbInternal", "ProcessMetricsLogCoreV2")
                .where("Date >=oldestDate")
                .where("Name = `Memory-Direct.Used`")
                .view("Timestamp", "ProcessInfoId", "HeapUsed=Last"),
        db.historicalTable("DbInternal", "ProcessMetricsLogCoreV2")
                .where("Date >=oldestDate")
                .where("Name = `Memory-Direct.Used`")
                .view("Timestamp", "ProcessInfoId", "HeapUsed=Last"))
        .lastBy("Timestamp", "ProcessInfoId")
        .updateView("HeapGB=HeapUsed/bytesPerGB")

maxHeapHistoryOC = oneClick(PQHistory.where("MaxHeapUsedInGB > 0"), "Name")
hotOC = oneClick(heapOverTime, "ProcessInfoId")
nhotOC = oneClick(nonHeapOverTime, "ProcessInfoId")
dotOC = oneClick(directOverTime, "ProcessInfoId")

MemoryUsageSingleRun =
        plot("Heap Usage (GB)", hotOC, "Timestamp", "HeapGB")
                .plot("Non-Heap Usage (GB)", nhotOC, "Timestamp", "HeapGB")
                .plot("Direct Usage (GB)", dotOC, "Timestamp", "HeapGB")
                .show()
HeapMaxUsageHistory = catPlot("Heap Usage (GB) By Execution", maxHeapHistoryOC, "Start", "MaxHeapUsedInGB").show()
from deephaven.plot.selectable_dataset import one_click
from deephaven.agg import max_, min_, last
from deephaven.plot.figure import Figure
from deephaven.calendar import calendar
from deephaven.time import dh_today
from deephaven import time_table
from deephaven import merge

cal = calendar()
date = dh_today()
previous_date = cal.minusBusinessDays(date, 1)
oldest_date = cal.minusBusinessDays(date, 7)
bytes_per_gb = 1024 * 1024 * 1024

pqc = merge(
    [
        db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2")
        .last_by("Name")
        .where("EventType != `REMOVED`"),
        db.live_table("DbInternal", "PersistentQueryConfigurationLogV2")
        .last_by("Name")
        .where("EventType != `REMOVED`"),
    ]
).last_by("Name")

base_pqs = (
    db.live_table("DbInternal", "PersistentQueryStateLog")
    .where("Date=date.toString()")
    .where_in(pqc, "Name")
    .view(
        [
            "Name",
            "Owner",
            "Status",
            "ServerHost",
            "DispatcherHost",
            "WorkerName",
            "ProcessInfoId",
        ]
    )
    .last_by("Name")
)

heap_usage = merge(
    [
        db.live_table("DbInternal", "ProcessMetricsLogCoreV2")
        .where("Date>=oldest_date")
        .where("Name = `Memory-Heap.Used`")
        .last_by("ProcessInfoId")
        .update_view("MaxHeapUsedInGB=Max/bytes_per_gb"),
        db.historical_table("DbInternal", "ProcessMetricsLogCoreV2")
        .where("Date>=oldest_date")
        .where("Name=`Memory-Heap.Used`")
        .last_by("ProcessInfoId")
        .update_view("MaxHeapUsedInGB=Max/bytes_per_gb"),
    ]
).last_by("ProcessInfoId")

pqs = (
    base_pqs.natural_join(
        pqc, "Name", ["Enabled", "HeapSizeInGB", "DataBufferPoolToHeapSizeRatio"]
    )
    .natural_join(heap_usage, "ProcessInfoId=ProcessInfoId", "MaxHeapUsedInGB")
    .sort("Name")
)

now_table = time_table("PT1s").rename_columns("Now=Timestamp")

running_pq_history_1 = (
    db.live_table("DbInternal", "PersistentQueryStateLog")
    .where(["Date>=`2023-03-07`", "ProcessInfoId != null"])
    .view(
        [
            "Date",
            "Owner",
            "Name",
            "Timestamp",
            "Status",
            "DispatcherHost",
            "WorkerName",
            "ProcessInfoId",
            "SerialNumber",
            "VersionNumber",
        ]
    )
    .agg_by(
        [
            min_(
                [
                    "Name",
                    "DispatcherHost",
                    "WorkerName",
                    "SerialNumber",
                    "VersionNumber",
                    "Start=Timestamp",
                ]
            ),
            max_("Timestamp"),
            last("Status"),
        ],
        "ProcessInfoId",
    )
)

running_pq_history_2 = (
    now_table.snapshot_when(running_pq_history_1)
    .update_view(
        [
            "End=(Status==`Running` ? now() : Timestamp)",
            "Duration_seconds=(End - Start)/1000000000",
            "Duration_minutes=Duration_seconds/60",
        ]
    )
    .drop_columns(["Now", "Timestamp"])
)

pq_history = (
    merge(
        [
            db.historical_table("DbInternal", "PersistentQueryStateLog")
            .where(["Date >= oldestDate", "ProcessInfoId != null"])
            .view(
                [
                    "Date",
                    "Owner",
                    "Name",
                    "Timestamp",
                    "Status",
                    "DispatcherHost",
                    "WorkerName",
                    "ProcessInfoId",
                    "SerialNumber",
                    "VersionNumber",
                ]
            )
            .agg_by(
                [
                    min_(
                        [
                            "Name",
                            "DispatcherHost",
                            "WorkerName",
                            "SerialNumber",
                            "VersionNumber",
                            "Start=Timestamp",
                        ]
                    ),
                    max_("End=Timestamp"),
                    last("Status"),
                ],
                "ProcessInfoId",
            )
            .update_view(
                [
                    "Duration_seconds=(End - Start)/1000000000",
                    "Duration_minutes=Duration_seconds/60",
                ]
            ),
            running_pq_history_2,
        ]
    )
    .natural_join(heap_usage, "ProcessInfoId", "MaxHeapUsedInGB")
    .move_columns_up(["Start", "End", "Name"])
    .sort_descending("Start")
)

heap_over_time = (
    merge(
        [
            db.live_table("DbInternal", "ProcessMetricsLogCoreV2")
            .where("Date>=oldest_date")
            .where("Name=`Memory-Heap.Used`")
            .view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
            db.historical_table("DbInternal", "ProcessMetricsLogCoreV2")
            .where("Date>=oldest_date")
            .where("Name=`Memory-Heap.Used`")
            .view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
        ]
    )
    .last_by(["Timestamp", "ProcessInfoId"])
    .update_view("HeapGB=HeapUsed/bytes_per_gb")
)

non_heap_over_time = (
    merge(
        [
            db.live_table("DbInternal", "ProcessMetricsLogCoreV2")
            .where("Date>=oldest_date")
            .where("Name=`Memory-NonHeap.Used`")
            .view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
            db.historical_table("DbInternal", "ProcessMetricsLogCoreV2")
            .where("Date>=oldest_date")
            .where("Name=`Memory-NonHeap.Used`")
            .view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
        ]
    )
    .last_by(["Timestamp", "ProcessInfoId"])
    .update_view("HeapGB=HeapUsed/bytes_per_gb")
)

direct_over_time = (
    merge(
        [
            db.live_table("DbInternal", "ProcessMetricsLogCoreV2")
            .where("Date>=oldest_date")
            .where("Name=`Memory-Direct.Used`")
            .view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
            db.historical_table("DbInternal", "ProcessMetricsLogCoreV2")
            .where("Date>=oldest_date")
            .where("Name=`Memory-Direct.Used`")
            .view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
        ]
    )
    .last_by(["Timestamp", "ProcessInfoId"])
    .update_view("HeapGB=HeapUsed/bytes_per_gb")
)

max_heap_history_oc = one_click(pq_history.where("MaxHeapUsedInGB>0"), ["Name"])
hot_oc = one_click(heap_over_time, ["ProcessInfoId"])
non_hot_oc = one_click(non_heap_over_time, ["ProcessInfoId"])
dot_oc = one_click(direct_over_time, ["ProcessInfoId"])

memory_usage_single_run = (
    Figure()
    .plot_xy(series_name="Heap Usage (GB)", t=hot_oc, x="Timestamp", y="HeapGB")
    .plot_xy(series_name="Non-Heap Usage (GB)", t=non_hot_oc, x="Timestamp", y="HeapGB")
    .plot_xy(series_name="Direct Usage (GB)", t=dot_oc, x="Timestamp", y="HeapGB")
    .show()
)

heap_max_usage_history = (
    Figure()
    .plot_cat(
        series_name="Heap Usage (GB) by Execution",
        t=max_heap_history_oc,
        category="Start",
        y="MaxHeapUsedInGB",
    )
    .show()
)

Dashboard 1

The PQHistory and Persistent Query Configurations tables include helpful information such as query status and the amount of memory configured vs the amount of memory currently used. To filter the graphs to a specific query, use a One Click filter or the Linker tool.

Monitor server usage

This example creates a dashboard that includes the heap usage of all PQs and breaks it down by server. This is particularly prudent for load balancing in a Deephaven cluster. It also shows all current workers and their server hosts, heap allocation, and PQ name (if applicable).

Code
import  io.deephaven.time.calendar.Calendars
import  io.deephaven.time.calendar.BusinessCalendar
import io.deephaven.enterprise.database.Database
import io.deephaven.engine.table.Table
import io.deephaven.time.DateTimeUtils
import io.deephaven.engine.util.TableTools

import java.time.Instant
import java.time.temporal.ChronoUnit

db = (Database) db
final BusinessCalendar cal = Calendars.calendar()
date = DateTimeUtils.today()
cutoffDate = cal.minusBusinessDays(date, 7)
cutoffTime = Instant.now().minus(5, ChronoUnit.MINUTES)

// get details about workers that are alive
Table activeWorkers = db.liveTable("DbInternal", "ProcessEventLog")
        .where("Date>=today()")
        .sort("Timestamp")
        .lastBy("ProcessInfoId")
        .where("Timestamp >= cutoffTime")
        .view("ProcessInfoId", "WorkerName=Process", "Host")

// get dispatcher details and remove any recently terminated workers
Table currentWorkers = activeWorkers
        .join(db.liveTable("DbInternal", "AuditEventLog")
                .where("Date>=cutoffDate", "Event=`Starting worker`")
                .update("PPID=Details.split(`:`)[2].trim()")
                ,"ProcessInfoId=PPID"
                ,"Process,ServerPort")
        .updateView("DispatcherName=Process == `db_query_server` ? `default` : `mergedispatcher`")
        .dropColumns("Process")
        .whereNotIn(db.liveTable("DbInternal", "AuditEventLog")
                .where("Date>=cutoffDate", "Event=`SHUTTING_DOWN`"),
                "ProcessInfoId")

Table memoryUsage = TableTools.merge(db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")
        .where(), db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2").where())
        .naturalJoin(
                db.liveTable("DbInternal", "PersistentQueryStateLog")
                        .where("Date>=cutoffDate")
                        .lastBy("SerialNumber", "VersionNumber"),
                "SerialNumber, VersionNumber",
                "WorkerName, ProcessInfoId"
        ).lastBy("ProcessInfoId")

currentWorkerInfo = currentWorkers.naturalJoin(memoryUsage, "ProcessInfoId", "HeapSizeInGB, Name, ConfigurationType")
        .where("!(isNull(HeapSizeInGB))")  //filter out workers which probably stopped ungracefully

workerHeapUsage = currentWorkerInfo
        .update("PurposeAndWorker = ConfigurationType + ` : ` + Name")
        .sortDescending("HeapSizeInGB")
        .rollup([AggSum("HeapSizeInGB")]
                ,"Host"
                ,"DispatcherName"
                ,"PurposeAndWorker")
from deephaven.time import dh_today, to_j_instant
from deephaven.calendar import calendar
from deephaven.agg import sum_
from deephaven import merge

from datetime import datetime, timedelta

cal = calendar()
date = dh_today()
cutoff_date = cal.minusBusinessDays(date, 7)
cutoff_time = to_j_instant(datetime.now() - timedelta(minutes=5))

# Get details about workers that are alive
active_workers = (
    db.live_table("DbInternal", "ProcessEventLog")
    .where("Date>=today()")
    .sort("Timestamp")
    .last_by("ProcessInfoId")
    .where("Timestamp>=cutoff_time")
    .view(["ProcessInfoId", "WorkerName=Process", "Host"])
)

# Get dispatcher details and remove any recently terminated workers
current_workers = (
    active_workers.join(
        db.live_table("DbInternal", "AuditEventLog")
        .where(["Date>=cutoff_date", "Event=`Starting worker`"])
        .update("PPID=Details.split(`:`)[2].trim()"),
        "ProcessInfoId=PPID",
        ["Process", "ServerPort"],
    )
    .update_view(
        "DispatcherName=Process == `db_query_server` ? `default` : `mergedispatcher`"
    )
    .drop_columns("Process")
    .where_not_in(
        db.live_table("DbInternal", "AuditEventLog").where(
            ["Date>=cutoff_date", "Event=`SHUTTING_DOWN`"]
        ),
        "ProcessInfoId",
    )
)

memory_usage = (
    merge(
        [
            db.live_table("DbInternal", "PersistentQueryConfigurationLogV2").where(),
            db.historical_table(
                "DbInternal", "PersistentQueryConfigurationLogV2"
            ).where(),
        ]
    )
    .natural_join(
        db.live_table("DbInternal", "PersistentQueryStateLog")
        .where("Date>=cutoff_date")
        .last_by(["SerialNumber", "VersionNumber"]),
        ["SerialNumber", "VersionNumber"],
        ["WorkerName", "ProcessInfoId"],
    )
    .last_by("ProcessInfoId")
)

# Filter out workers which probably stopped ungracefully
current_worker_info = current_workers.natural_join(
    memory_usage, "ProcessInfoId", ["HeapSizeInGB", "Name", "ConfigurationType"]
).where("!(isNull(HeapSizeInGB))")

worker_heap_usage = (
    current_worker_info.update("PurposeAndWorker = ConfigurationType + ` : ` + Name")
    .sort_descending("HeapSizeInGB")
    .rollup(sum_("HeapSizeInGB"), ["Host", "DispatcherName", "PurposeAndWorker"])
)

Dashboard 2