Monitoring Queries
This guide covers best practices for monitoring Persistent Queries (PQs) in Deephaven. Regular monitoring of queries is critical to identifying, diagnosing, and resolving issues related to performance, resource usage, correctness, and stability in a timely fashion. The methods presented combine the use of Deephaven's internal tables and dashboards.
Deephaven's internal tables contain information about the state of the system, including query status. These tables can be queried individually or together to diagnose slow and/or unresponsive queries. Combining these tables with additional widgets and graphical tools helps continuous monitoring efforts and provides a comprehensive view of the system's health.
The techniques presented are intended to help you troubleshoot queries independently. However, some issues may require assistance from Deephaven support. When serious, unexpected, or esoteric issues arise, it's recommended to send Console logs, Exception messages, and any other pertinent information to the support team.
Internal tables
All Deephaven installations include many tables that contain specific information for the system's internal use. These tables are updated by various processes. They contain details about processes, workers, PQ states, and all query operations. These tables are stored in the DbInternal
namespace.
Every internal table contains a ProcessInfoId
column with a unique identifier for a particular worker and can be used to correlate values across multiple tables.
Query Operation Performance Log
The Query Operation Performance Log (QOPL) contains information about performance details on query operations such as duration and memory usage. This table can be queried in a Core+ worker with the following code:
qopl = db.liveTable("DbInternal", "QueryOperationPerformanceLogCoreV2").where("Date=today()")
qopl = db.live_table("DbInternal", "QueryOperationPerformanceLogCoreV2").where(
"Date=today()"
)
See Query Operation Performance Log for an explanation of each column in the table.
Update Performance Log
The Update Performance Log describes the time and resources used by the Update Graph. It contains aggregated performance details on incremental update operations performed in update graphs. This table can be queried in a Core+ worker with the following code:
upl = db.liveTable("DbInternal", "UpdatePerformanceLogCoreV2").where("Date=today()")
upl = db.live_table("DbInternal", "UpdatePerformanceLogCoreV2").where("Date=today()")
See Update Performance Log for an explanation of each column in the table.
Persistent Query State Log
The Persistent Query State Log contains information about the state of PQs. Any time a worker's state changes, whether it be from starting, stopping, or anything else, that information gets logged to this table. It can be queried in a Core+ worker with the following code:
pqsl = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()")
pqsl = db.live_table("DbInternal", "PersistentQueryStateLog").where("Date=today()")
Statuses of queries that have not failed in this table can be one of:
Connected
Acquiring Worker
Initializing
Completed
Running
Stopped
Statuses of queries that have failed in this table can be one of:
See Persistent Query State Log for an explanation of each column in the table.
Failed
This means that the PQ failed before or during initialization. Refer to the ExceptionMessage
column in the Persistent Query State Log for specific details.
- The most common failure is an error in a script. Expanding the cell shows the full text of an error message. As an example, leaving a closing quotation mark off a string will result in a
ScriptException
and the line number of the error. - Queries fail if the query exceeds the query server's maximum heap size. In cases where this happens, increase the memory (heap) usage value in the Query Monitor.
Error
This means that an error occurred after the query was initialized (e.g., while processing incremental updates). The ExceptionStackTrace
column may offer more information in this case. To review this information, expand the relevant cell to open a window with the full text, e.g. by right-clicking the entry and selecting View Cell Contents.
Disconnected
This means the worker process disconnected from the dispatcher. Query workers typically disconnect from the dispatcher as a result of an exceptional error or excessive workload that prevents the worker from sending a heartbeat. Common reasons for a disconnection include:
- The server runs out of heap and an
OutOfMemoryError
notification occurs, thus killing the JVM. - The JVM is stuck in Garbage Collection so long that it is unable to respond to dispatcher heartbeats for 60 seconds.
- A deployment issue results in a
NoSuchMethodError
. - An error occurred in internal JVM or a native code error occurred (aka, a JVM Hotspot Error).
The following query shows all the times a PQ entered a failure state on the current date:
pqslAllFailedStates = db.liveTable("DbInternal", "PersistentQueryStateLog")
.where("Date=today()", "Status in `Failed`, `Error`, `Disconnected`")
pqs_all_failed_states = db.live_table("DbInternal", "PersistentQueryStateLog").where(
["Date=today()", "Status in `Failed`, `Error`, `Disconnected`"]
)
Getting the last row by serial number shows the most recent state of any PQ that attempted to start on the current date:
pqslAllCurrentStates = db.liveTable("DbInternal", "PersistentQueryStateLog")
.where("Date=today()").lastBy("SerialNumber")
pqs_all_current_states = (
db.live_table("DbInternal", "PersistentQueryStateLog")
.where("Date=today()")
.last_by("SerialNumber")
)
Combining the above queries indicates which PQs are currently in a failure state:
pqslCurrentFailedStates = db.liveTable("DbInternal", "PersistentQueryStateLog")
.where("Date=today()")
.lastBy("SerialNumber")
.where("Status in `Failed`, `Error`, `Disconnected`")
pqs_current_failed_states = (
db.live_table("DbInternal", "PersistentQueryStateLog")
.where("Date=today()")
.last_by("SerialNumber")
.where(["Status in `Failed`, `Error`, `Disconnected`"])
)
Alternatively, the Query Monitor provides details about the PQs that you are authorized to see. Information about each query is stored in columns, including the owner, query name, whether it's enabled, and its status. If a PQ has failed, the ExceptionDetails
column includes the same raw data as the Persistent Query State Log, which may contain pertinent information.
To read the ExceptionDetails
, expand the relevant cell. This opens a window that displays the full content. Alternatively, you can click the row of the relevant PQ, which presents its ExceptionDetails
in its configuration editor summary.
Process Event Log
The Process Event Log contains all log messages for Deephaven workers and query/merge servers. These log messages include the output from workers. It can be queried in a Core+ worker with the following code:
pel = db.liveTable("DbInternal", "ProcessEventLog").where("Date=today()")
pel = db.live_table("DbInternal", "ProcessEventLog").where("Date=today()")
A common usage pattern for this table is to find all FATAL
level messages for the current day:
pel = db.liveTable("DbInternal", "ProcessEventLog").where("Date=today()", "Level=`FATAL`")
pel = db.live_table("DbInternal", "ProcessEventLog").where(
["Date=today()", "Level=`FATAL`"]
)
You can also find specific error messages by using the Process Info ID to query the log. This information can be found by hovering over the Info button in the console:
Expand the cell for your query in the Log Entry
column to view a full exception message and stack trace, or right-click the cell and select View Cell Contents to open the text in its own dialog window.
To search the LogEntry
column, right click it and select Go To. Alternatively, the keyboard shortcut cmd+G
(Mac) or ctrl+G
(Windows) opens a search box.
See Process Event Log for an explanation of each column in the table.
Monitor queries with internal tables
The following sections describe how to use the internal tables described above to monitor queries.
Query Operation Performance Log and Update Performance Log
The Description
or EntryDescription
columns of these two tables includes human-readable information that describes the type of the operation. The image below shows a snippet of the Description
column from the Query Operation Performance Log, where you can see logs related to PQs that open the StockTrades
and StockQuotes
tables from the LearnDeephaven
namespace:
The majority of entries in this column indicate each operation performed on a table (e.g., join
, sort
, where
, etc.). Looking at other columns in a given row (StartTime
, EndTime
, WasInterrupted
, etc.) may help determine which specific operations are slow and offer hints as to why. While none of these log descriptions are cause for concern on their own, any action that takes a long time could contribute to performance issues.
Aggregated Small Updates
(Update Performance Log only) — A summary description that indicates a group of updates, where each of which is less than the configurable threshold for logging an update. For many workloads, the vast majority of updates will not materially affect performance. Aggregating the updates saves significant disk space and bandwidth. The threshold for update logging may be configured with theQueryPerformance.minimumUpdateLogDurationNanos
property. See Filtering Small QOPLCV2 and UPLCV2 Entries below.coalesce()
— Describes producing aQueryTable
that can be used for query operations from an uncoalesced table (typically aSourceTable
). TheSourceTable
is a placeholder that represents your data and defers most work until a query operation is required, at which point it must be coalesced.initializeAvailableLocations()
andinitializeLocationSizes()
— These actions require file system access to read directories (locations) and then manytable.size
files. This can have a substantial impact on a historical query.Uninstrumented Code
— This is code that is not enclosed in a measurement block, which means it cannot be monitored or traced. It could indicate a bug, or a complex operation that, because it is outside of our framework, has no details available.
Filter small entries
The Update Performance Log is extremely comprehensive. It contains many small entries (Aggregated Small Updates) that are not always of interest when debugging performance issues. You can set a minimum threshold to filter small entries, which frees up disk space and makes it easier to analyze the data.
Note
These properties are set in the Core properties configuration, not iris-environment.prop
.
QueryPerformance.minimumUpdateLogDurationNanos=<nanoseconds>
— sets a minimum duration in nanoseconds for recorded entries in the Update Performance Log. For example, setting this property to=1000000
discards entries in theUpdatePerformanceLogCoreV2
with a duration less than a millisecond. When set to0
, all values are logged.QueryPerformance.minimumLogDurationNanos
— sets a minimum duration in nanoseconds for recorded entries in theQueryOperationPerformanceLogCoreV2
. When set to0
, all values are logged.QueryPerformance.minimumUninstrumentedLogDurationNanos
— sets a minimum duration in nanoseconds of uninstrumented code to be recorded in the Query Operation Performance Log. When set to0
, all values are logged.
The following query creates three tables with specific totals and usage information to see just how many small entries exist in your system:
smallEntries = db.liveTable("DbInternal", "UpdatePerformanceLogCoreV2")
.where("Date=today()")
.updateView("Bucket=Long.highestOneBit(UsageNanos)")
.view("Count=1", "Bucket", "UsageNanos")
.sumBy("Bucket")
.sort("Bucket")
.groupBy()
.updateView("CumCount=cumsum(Count)", "CumUsage=cumsum(UsageNanos)")
.ungroup()
total = smallEntries
.view("Total=Count", "TotalUsage=UsageNanos")
.sumBy()
fraction = smallEntries
.naturalJoin(total, "")
.updateView("FracEntries=CumCount/Total", "FracTime=CumUsage/TotalUsage")
small_entries = (
db.live_table("DbInternal", "UpdatePerformanceLogCoreV2")
.where("Date=today()")
.update_view("Bucket=Long.highestOneBit(UsageNanos)")
.view(["Count=1", "Bucket", "UsageNanos"])
.sum_by("Bucket")
.sort("Bucket")
.group_by()
.update_view(["CumCount=cumsum(Count)", "CumUsage=cumsum(UsageNanos)"])
.ungroup()
)
total = small_entries.view(["Total=Count", "TotalUsage=UsageNanos"]).sum_by()
fraction = small_entries.natural_join(total, "").update_view(
["FracEntries=CumCount/Total", "FracTime=CumUsage/TotalUsage"]
)
Here's what the first table looks like:
The total
table shows a ticking total of the number of entries and their usage:
The fraction
table contains the fraction of entries and time for each bucket of usage:
Performance overview
Deephaven enables you to get a performance overview for a query by its Process Info ID, worker name, or PQ name. The following code block does all three:
// Get perf overview by PID
performanceOverviewByPiid("52e806dd-af75-412c-a286-ec29aa5571d2")
// Get perf overview by worker name
performanceOverviewByWorkerName("worker_12")
// Get perf overview by PQ name
performanceOverviewByPqName("PqName")
# Get perf overview by PID
performance_overview("52e806dd-af75-412c-a286-ec29aa5571d2")
# Get perf overview by worker name
performance_overview(worker_name="worker_12")
# Get perf overview by PQ name
performance_overview(pq_name="PqName")
If the same PQ ran multiple times during the day, provide a date-time string representing an as-of time, which will narrow the search to the latest PQ run at or before the specified time:
performanceOverviewByPqName("PqName", "2023-04-28T15:57:45 ET")
performance_overview(pq_name="PqName", as_of_time_string="2023-04-28T15:57:45 ET")
In Python, performance_overview
uses named arguments to specify the function's behavior. The following parameters may be used:
process_info_id
: The Process Info ID of the worker to get performance information for.worker_name
: The name of the worker to get performance information for.pq_name
: The name of the PQ to get performance information for.host_name
: The name of the host running the worker.as_of_time
: A date-time string representing an as-of time.as_of_time_string
: A string representation of a date-time representing an as-of time.owner
: The PQ owner. This is only relevant ifpq_name
is used.date
: The date to get information for. If not given, the current date is used.is_intraday
: Determines if the table retrieval is intraday or historicalis_live
: Determines whether an intraday table is live or static. This is only relevant ifis_intraday
is used.
In Groovy, performance overview is split into three functions.
- For Process Info ID:
performanceOverviewByPiid = { final String processInfoId, final String date = null, final boolean isIntraday = true, final boolean isLive = true -> ...
- For worker name:
performanceOverviewByWorkerName = { final String workerName, final String hostName = null, final String date = null, final boolean isIntraday = true, final boolean isLive = true -> ...
- For PQ name:
performanceOverviewByPqName = { final String pqName, final String asOfTimeString = null, final String owner = null, final String date = null, final boolean isIntraday = true, final boolean isLive = true -> ...
These methods open several tables and two plots (if applicable):
Tables:
QueryPerformance
QueryOperationPerformance
QueryUpdatePerformance
UpdateWorst
UpdateMostRecent
UpdateAggregate
UpdateSummaryStats
ProcessEventLog
ServerState
Plots:
The QueryOperationPerformance
table, in particular the DurationNanos
column, shows you the longest steps of your query's initialization process. For queries using intraday data, initialization is only part of the picture. Queries using live data also have to update results as new data flows in. If you have an intraday query that has trouble keeping up with real-time data, you should look at the UpdateWorst
table.
In the UpdateWorst
table, the column to focus on is Ratio
. The Ratio
value shows what percentage of a query's data refresh cycle is consumed by each operation. If the sum of the Ratio column for one update interval approaches N
, where N
is the amount of update threads, then the query will be unable to keep up with incoming data. The amount of update threads is determined by the property PeriodicUpdateGraph.updateThreads
. By default, it will be equal to the number of available processors. As using threads for updates respects dependencies, analysis may be more difficult, as the system might not be able to reach the total ratio of N
due to unsatisfied dependencies or long chains of serial operations. The sum of the Ratio
column for each interval, as a percentage, is available in the UpdateAggregate
table.
You should also carefully monitor the IntervalDurationNanos
value to ensure it does not exceed UpdatePerformanceTracker.reportIntervalMillis
(defaults to 60 seconds) plus PeriodicUpdateGraph.targetCycleDurationMillis
(defaults to 1 second). If it does, it's a sign that performance issues are causing longer target cycle times than expected.
Below is a description of each table that the performance overview methods produce, broken up by category.
Query initialization
QueryPerformance
This table contains top-level performance information for each query executed by the given worker. It contains data on how long each query takes to run.
A query, in this context, is the code that is run:
- Whenever you type a new command in the console and type Return. Each 'return' where you have to wait for an answer from the worker is a new query.
- When a Persistent Query initializes.
- As a result of a sort or filter in the GUI.
The most significant columns in this table are:
TimeSecs
— How long this query ran for, in seconds.QueryMemUsed
— Memory in use by the query (only includes active heap memory).QueryMemUsedPct
— Memory usage as a percentage of the max heap size (-Xmx
).QueryMemFree
— Remaining memory until the query runs into the max heap size.
Note
The sum of all workers running on the server at once can never exceed the maximum total heap, and a query server will refuse to start a new worker if doing so would exceed the maximum total heap size.
QueryOperationPerformance
This table contains performance information for each operation run by a query. Every call to the standard table operations (e.g. select
, update
, view
, etc.) constitutes a distinct operation.
The most significant columns in this table are:
StartTime
— The time at which this operation started.EndTime
— The time at which this operation ended.OperationNumber
— Monotonically increasing sequence numbers for each operation of a query.TimeSecs
— The time (in seconds) that the operation took.NetMemoryChange
— The change in memory usage while this operation was occurring. Memory usage is affected by factors beside the currently running operation; as a result,NetMemoryChange
is only an approximation of the memory impact of each operation.NetMemoryChange = TotalMemoryChange - FreeMemoryChange
, where "total memory" is memory reserved by the JVM for heap, and "free memory" is how much has not been used yet. TheNetMemoryChange
value will be negative when memory allocation is high, and its value will be positive when there is a large amount of GC. Sorting onNetMemoryChange
should provide a useful overview of which operations are taking the most memory.
Real-Time Updates
For queries operating on ticking data (e.g., intraday data or Input Tables), there are additional logs detailing the time consumed when updating tables with the new data.
QueryUpdatePerformance
This table describes what the query spent time on during its data refresh cycle. This data is written in predefined intervals; the default is one minute, but it can be configured per query. At the end of each performance monitoring interval, a query logs every operation whose results were updated, and how much time it spent performing that update throughout the interval.
The most significant columns are:
IntervalStartTime
— The start time of the interval.IntervalEndTime
— The end time of the interval.IntervalDurationNanos
— The duration of the performance monitoring interval, in nanoseconds.Ratio
— This is the percentage of the performance monitoring interval that was consumed updating each operation. This is an approximation of how much of the available CPU was used by a given operation.QueryMemoryUsed
— The total amount of memory used by the worker.QueryMemUsedPct
— Memory usage as a percentage of the max heap size (-Xmx
).QueryMemoryFree
— The total amount of free memory remaining. This is the difference ofWorkerHeapSize
andQueryMemoryUsed
. If this approaches zero, a query is likely to experience poor performance or crash due to an OutOfMemoryError.WorkerHeapSize
— The maximum heap size for the query worker.NRows
— Total number of changed rows.RowsPerSec
— Average rate data is ticking at.RowsPerCPUSec
— Approximation of how fast CPU handles row changes.
UpdateWorst
This table is the QueryUpdatePerformance
table sorted to show the slowest-updating operations, out of all intervals since the query initialized, at the beginning of the table ( i.e., the operations with the greatest Ratio
).
UpdateMostRecent
This table is the QueryUpdatePerformance
table sorted to show the most recent updates first. Operations are still sorted with the greatest Ratio
at the beginning of the table.
UpdateAggregate
This table shows the QueryUpdatePerformance
table aggregated for each performance recording interval. The Ratio
in this table represents the total CPU usage of each individual operation displayed in the other tables. If the Ratio
in this table regularly approaches N
(the amount of update threads - defaults to the amount of available processors), it indicates that the query may be unable to process all data updates within the target cycle time (the set target time for one UG cycle to complete). This can result in delayed data updates in the UI.
UpdateSummaryStats
This table takes the data in UpdateAggregate
and aggregates it into a single row view of the 99th, 90th, 75th, and 50th percentiles of Ratio
and QueryMemUsedPct
over all intervals, which makes the spread of resource consumption easier to view.
Other Tables
ProcessEventLog
This table is the Process Event Log for the specified worker.
ServerState
This table aims to show details about memory usage, Garbage Collection (GC) and Update Graph (UG) cycle count, durations, and ratios.
Garbage collection is a memory management process that automatically collects unused objects created on the heap and deletes them to free up memory.
The UG is the part of the query engine that handles real-time data updates. It runs in cycles (once per second, by default). When UG cycle logging is enabled, the UG will write the cycle's length to the logs (e.g. Update Graph Processor cycleTime: 472ms
) every time it completes a cycle.
Plots
UGPCycleTimeline
The UGPCycleTimeline
widget plots UG cycle times from the ServerState
table:
ServerStateTimeLine
The ServerStateTimeLine
widget plots UG ratio and memory usage from the ServerState
table.
Performance queries
Deephaven also supports a tree table view of query performance:
qoptt = PerformanceQueries.queryPerformanceAsTreeTable()
qptt = PerformanceQueries.queryOperationPerformanceAsTreeTable()
Dashboards
This section provides two different examples that construct dashboards containing one or more internal tables, plots, and other widgets that can be used to monitor query performance.
Monitor PQ memory and initialization times
This example creates a dashboard that includes graphs of initialization times as well as heap usage over the last seven days of a specific PQ, then graphs the heap usage throughout the day of a single run.
Code
import io.deephaven.time.calendar.Calendars
import io.deephaven.time.calendar.BusinessCalendar
import io.deephaven.enterprise.database.Database
import io.deephaven.engine.table.Table
import io.deephaven.engine.util.TableTools
import io.deephaven.time.DateTimeUtils
db = (Database) db
final BusinessCalendar cal = Calendars.calendar()
date = DateTimeUtils.today()
previousDate = cal.minusBusinessDays(date, 1)
oldestDate = cal.minusBusinessDays(date, 7)
bytesPerGB = 1024*1024*1024
Table PQC = TableTools.merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2")
.lastBy("Name")
.where("EventType != `REMOVED`"),
db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")
.lastBy("Name")
.where("EventType != `REMOVED`")).lastBy("Name")
Table basePQs = db.liveTable("DbInternal", "PersistentQueryStateLog")
.where("Date=date.toString()")
.whereIn(PQC, "Name")
.view("Name", "Owner", "Status", "ServerHost", "DispatcherHost", "WorkerName", "ProcessInfoId")
.lastBy("Name")
Table heapUsage = merge(
db.liveTable("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date >=oldestDate")
.where("Name = `Memory-Heap.Used`")
.lastBy("ProcessInfoId")
.updateView("MaxHeapUsedInGB=Max/bytesPerGB"),
db.historicalTable("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date >=oldestDate")
.where("Name = `Memory-Heap.Used`")
.lastBy("ProcessInfoId")
.updateView("MaxHeapUsedInGB=Max/bytesPerGB"))
.lastBy("ProcessInfoId")
PQs = basePQs
.naturalJoin(PQC, "Name", "Enabled,HeapSizeInGB,DataBufferPoolToHeapSizeRatio")
.naturalJoin(heapUsage, "ProcessInfoId=ProcessInfoId", "MaxHeapUsedInGB")
.sort("Name")
Table nowTable=timeTable("PT00:00:01").renameColumns("Now=Timestamp")
Table runningPQHistory1 = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date >= `2023-03-07`", "ProcessInfoId != null")
.view("Date", "Owner", "Name", "Timestamp", "Status", "DispatcherHost", "WorkerName", "ProcessInfoId", "SerialNumber", "VersionNumber")
.aggBy([AggMin("Name", "DispatcherHost", "WorkerName", "SerialNumber", "VersionNumber", "Start=Timestamp"),AggMax("Timestamp"),AggLast("Status")],"ProcessInfoId")
Table runningPQHistory2=nowTable.snapshotWhen(runningPQHistory1)
.updateView("End=(Status==`Running` ? now() : Timestamp)",
"Duration_seconds=(End - Start)/1000000000", "Duration_minutes=Duration_seconds/60")
.dropColumns("Now", "Timestamp")
PQHistory = TableTools.merge(db.historicalTable("DbInternal", "PersistentQueryStateLog").where("Date >= oldestDate", "ProcessInfoId != null")
.view("Date", "Owner", "Name", "Timestamp", "Status", "DispatcherHost", "WorkerName", "ProcessInfoId", "SerialNumber", "VersionNumber")
.aggBy([AggMin("Name", "DispatcherHost", "WorkerName", "SerialNumber", "VersionNumber", "Start=Timestamp"),AggMax("End=Timestamp"),AggLast("Status")],"ProcessInfoId")
.updateView("Duration_seconds=(End - Start)/1000000000", "Duration_minutes=Duration_seconds/60"),
runningPQHistory2)
.naturalJoin(heapUsage, "ProcessInfoId", "MaxHeapUsedInGB")
.moveColumnsUp("Start", "End", "Name")
.sortDescending("Start")
Table heapOverTime = TableTools.merge(
db.liveTable("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date >=oldestDate")
.where("Name = `Memory-Heap.Used`")
.view("Timestamp", "ProcessInfoId", "HeapUsed=Last"),
db.historicalTable("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date >=oldestDate")
.where("Name = `Memory-Heap.Used`")
.view("Timestamp", "ProcessInfoId", "HeapUsed=Last"))
.lastBy("Timestamp", "ProcessInfoId")
.updateView("HeapGB=HeapUsed/bytesPerGB")
Table nonHeapOverTime = TableTools.merge(
db.liveTable("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date >=oldestDate")
.where("Name = `Memory-NonHeap.Used`")
.view("Timestamp", "ProcessInfoId", "HeapUsed=Last"),
db.historicalTable("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date >=oldestDate")
.where("Name = `Memory-NonHeap.Used`")
.view("Timestamp", "ProcessInfoId", "HeapUsed=Last"))
.lastBy("Timestamp", "ProcessInfoId")
.updateView("HeapGB=HeapUsed/bytesPerGB")
Table directOverTime = TableTools.merge(
db.liveTable("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date >=oldestDate")
.where("Name = `Memory-Direct.Used`")
.view("Timestamp", "ProcessInfoId", "HeapUsed=Last"),
db.historicalTable("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date >=oldestDate")
.where("Name = `Memory-Direct.Used`")
.view("Timestamp", "ProcessInfoId", "HeapUsed=Last"))
.lastBy("Timestamp", "ProcessInfoId")
.updateView("HeapGB=HeapUsed/bytesPerGB")
maxHeapHistoryOC = oneClick(PQHistory.where("MaxHeapUsedInGB > 0"), "Name")
hotOC = oneClick(heapOverTime, "ProcessInfoId")
nhotOC = oneClick(nonHeapOverTime, "ProcessInfoId")
dotOC = oneClick(directOverTime, "ProcessInfoId")
MemoryUsageSingleRun =
plot("Heap Usage (GB)", hotOC, "Timestamp", "HeapGB")
.plot("Non-Heap Usage (GB)", nhotOC, "Timestamp", "HeapGB")
.plot("Direct Usage (GB)", dotOC, "Timestamp", "HeapGB")
.show()
HeapMaxUsageHistory = catPlot("Heap Usage (GB) By Execution", maxHeapHistoryOC, "Start", "MaxHeapUsedInGB").show()
from deephaven.plot.selectable_dataset import one_click
from deephaven.agg import max_, min_, last
from deephaven.plot.figure import Figure
from deephaven.calendar import calendar
from deephaven.time import dh_today
from deephaven import time_table
from deephaven import merge
cal = calendar()
date = dh_today()
previous_date = cal.minusBusinessDays(date, 1)
oldest_date = cal.minusBusinessDays(date, 7)
bytes_per_gb = 1024 * 1024 * 1024
pqc = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2")
.last_by("Name")
.where("EventType != `REMOVED`"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2")
.last_by("Name")
.where("EventType != `REMOVED`"),
]
).last_by("Name")
base_pqs = (
db.live_table("DbInternal", "PersistentQueryStateLog")
.where("Date=date.toString()")
.where_in(pqc, "Name")
.view(
[
"Name",
"Owner",
"Status",
"ServerHost",
"DispatcherHost",
"WorkerName",
"ProcessInfoId",
]
)
.last_by("Name")
)
heap_usage = merge(
[
db.live_table("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date>=oldest_date")
.where("Name = `Memory-Heap.Used`")
.last_by("ProcessInfoId")
.update_view("MaxHeapUsedInGB=Max/bytes_per_gb"),
db.historical_table("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date>=oldest_date")
.where("Name=`Memory-Heap.Used`")
.last_by("ProcessInfoId")
.update_view("MaxHeapUsedInGB=Max/bytes_per_gb"),
]
).last_by("ProcessInfoId")
pqs = (
base_pqs.natural_join(
pqc, "Name", ["Enabled", "HeapSizeInGB", "DataBufferPoolToHeapSizeRatio"]
)
.natural_join(heap_usage, "ProcessInfoId=ProcessInfoId", "MaxHeapUsedInGB")
.sort("Name")
)
now_table = time_table("PT1s").rename_columns("Now=Timestamp")
running_pq_history_1 = (
db.live_table("DbInternal", "PersistentQueryStateLog")
.where(["Date>=`2023-03-07`", "ProcessInfoId != null"])
.view(
[
"Date",
"Owner",
"Name",
"Timestamp",
"Status",
"DispatcherHost",
"WorkerName",
"ProcessInfoId",
"SerialNumber",
"VersionNumber",
]
)
.agg_by(
[
min_(
[
"Name",
"DispatcherHost",
"WorkerName",
"SerialNumber",
"VersionNumber",
"Start=Timestamp",
]
),
max_("Timestamp"),
last("Status"),
],
"ProcessInfoId",
)
)
running_pq_history_2 = (
now_table.snapshot_when(running_pq_history_1)
.update_view(
[
"End=(Status==`Running` ? now() : Timestamp)",
"Duration_seconds=(End - Start)/1000000000",
"Duration_minutes=Duration_seconds/60",
]
)
.drop_columns(["Now", "Timestamp"])
)
pq_history = (
merge(
[
db.historical_table("DbInternal", "PersistentQueryStateLog")
.where(["Date >= oldestDate", "ProcessInfoId != null"])
.view(
[
"Date",
"Owner",
"Name",
"Timestamp",
"Status",
"DispatcherHost",
"WorkerName",
"ProcessInfoId",
"SerialNumber",
"VersionNumber",
]
)
.agg_by(
[
min_(
[
"Name",
"DispatcherHost",
"WorkerName",
"SerialNumber",
"VersionNumber",
"Start=Timestamp",
]
),
max_("End=Timestamp"),
last("Status"),
],
"ProcessInfoId",
)
.update_view(
[
"Duration_seconds=(End - Start)/1000000000",
"Duration_minutes=Duration_seconds/60",
]
),
running_pq_history_2,
]
)
.natural_join(heap_usage, "ProcessInfoId", "MaxHeapUsedInGB")
.move_columns_up(["Start", "End", "Name"])
.sort_descending("Start")
)
heap_over_time = (
merge(
[
db.live_table("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date>=oldest_date")
.where("Name=`Memory-Heap.Used`")
.view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
db.historical_table("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date>=oldest_date")
.where("Name=`Memory-Heap.Used`")
.view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
]
)
.last_by(["Timestamp", "ProcessInfoId"])
.update_view("HeapGB=HeapUsed/bytes_per_gb")
)
non_heap_over_time = (
merge(
[
db.live_table("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date>=oldest_date")
.where("Name=`Memory-NonHeap.Used`")
.view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
db.historical_table("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date>=oldest_date")
.where("Name=`Memory-NonHeap.Used`")
.view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
]
)
.last_by(["Timestamp", "ProcessInfoId"])
.update_view("HeapGB=HeapUsed/bytes_per_gb")
)
direct_over_time = (
merge(
[
db.live_table("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date>=oldest_date")
.where("Name=`Memory-Direct.Used`")
.view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
db.historical_table("DbInternal", "ProcessMetricsLogCoreV2")
.where("Date>=oldest_date")
.where("Name=`Memory-Direct.Used`")
.view(["Timestamp", "ProcessInfoId", "HeapUsed=Last"]),
]
)
.last_by(["Timestamp", "ProcessInfoId"])
.update_view("HeapGB=HeapUsed/bytes_per_gb")
)
max_heap_history_oc = one_click(pq_history.where("MaxHeapUsedInGB>0"), ["Name"])
hot_oc = one_click(heap_over_time, ["ProcessInfoId"])
non_hot_oc = one_click(non_heap_over_time, ["ProcessInfoId"])
dot_oc = one_click(direct_over_time, ["ProcessInfoId"])
memory_usage_single_run = (
Figure()
.plot_xy(series_name="Heap Usage (GB)", t=hot_oc, x="Timestamp", y="HeapGB")
.plot_xy(series_name="Non-Heap Usage (GB)", t=non_hot_oc, x="Timestamp", y="HeapGB")
.plot_xy(series_name="Direct Usage (GB)", t=dot_oc, x="Timestamp", y="HeapGB")
.show()
)
heap_max_usage_history = (
Figure()
.plot_cat(
series_name="Heap Usage (GB) by Execution",
t=max_heap_history_oc,
category="Start",
y="MaxHeapUsedInGB",
)
.show()
)
The PQHistory
and Persistent Query Configurations
tables include helpful information such as query status and the amount of memory configured vs the amount of memory currently used. To filter the graphs to a specific query, use a One Click filter or the Linker tool.
Monitor server usage
This example creates a dashboard that includes the heap usage of all PQs and breaks it down by server. This is particularly prudent for load balancing in a Deephaven cluster. It also shows all current workers and their server hosts, heap allocation, and PQ name (if applicable).
Code
import io.deephaven.time.calendar.Calendars
import io.deephaven.time.calendar.BusinessCalendar
import io.deephaven.enterprise.database.Database
import io.deephaven.engine.table.Table
import io.deephaven.time.DateTimeUtils
import io.deephaven.engine.util.TableTools
import java.time.Instant
import java.time.temporal.ChronoUnit
db = (Database) db
final BusinessCalendar cal = Calendars.calendar()
date = DateTimeUtils.today()
cutoffDate = cal.minusBusinessDays(date, 7)
cutoffTime = Instant.now().minus(5, ChronoUnit.MINUTES)
// get details about workers that are alive
Table activeWorkers = db.liveTable("DbInternal", "ProcessEventLog")
.where("Date>=today()")
.sort("Timestamp")
.lastBy("ProcessInfoId")
.where("Timestamp >= cutoffTime")
.view("ProcessInfoId", "WorkerName=Process", "Host")
// get dispatcher details and remove any recently terminated workers
Table currentWorkers = activeWorkers
.join(db.liveTable("DbInternal", "AuditEventLog")
.where("Date>=cutoffDate", "Event=`Starting worker`")
.update("PPID=Details.split(`:`)[2].trim()")
,"ProcessInfoId=PPID"
,"Process,ServerPort")
.updateView("DispatcherName=Process == `db_query_server` ? `default` : `mergedispatcher`")
.dropColumns("Process")
.whereNotIn(db.liveTable("DbInternal", "AuditEventLog")
.where("Date>=cutoffDate", "Event=`SHUTTING_DOWN`"),
"ProcessInfoId")
Table memoryUsage = TableTools.merge(db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")
.where(), db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2").where())
.naturalJoin(
db.liveTable("DbInternal", "PersistentQueryStateLog")
.where("Date>=cutoffDate")
.lastBy("SerialNumber", "VersionNumber"),
"SerialNumber, VersionNumber",
"WorkerName, ProcessInfoId"
).lastBy("ProcessInfoId")
currentWorkerInfo = currentWorkers.naturalJoin(memoryUsage, "ProcessInfoId", "HeapSizeInGB, Name, ConfigurationType")
.where("!(isNull(HeapSizeInGB))") //filter out workers which probably stopped ungracefully
workerHeapUsage = currentWorkerInfo
.update("PurposeAndWorker = ConfigurationType + ` : ` + Name")
.sortDescending("HeapSizeInGB")
.rollup([AggSum("HeapSizeInGB")]
,"Host"
,"DispatcherName"
,"PurposeAndWorker")
from deephaven.time import dh_today, to_j_instant
from deephaven.calendar import calendar
from deephaven.agg import sum_
from deephaven import merge
from datetime import datetime, timedelta
cal = calendar()
date = dh_today()
cutoff_date = cal.minusBusinessDays(date, 7)
cutoff_time = to_j_instant(datetime.now() - timedelta(minutes=5))
# Get details about workers that are alive
active_workers = (
db.live_table("DbInternal", "ProcessEventLog")
.where("Date>=today()")
.sort("Timestamp")
.last_by("ProcessInfoId")
.where("Timestamp>=cutoff_time")
.view(["ProcessInfoId", "WorkerName=Process", "Host"])
)
# Get dispatcher details and remove any recently terminated workers
current_workers = (
active_workers.join(
db.live_table("DbInternal", "AuditEventLog")
.where(["Date>=cutoff_date", "Event=`Starting worker`"])
.update("PPID=Details.split(`:`)[2].trim()"),
"ProcessInfoId=PPID",
["Process", "ServerPort"],
)
.update_view(
"DispatcherName=Process == `db_query_server` ? `default` : `mergedispatcher`"
)
.drop_columns("Process")
.where_not_in(
db.live_table("DbInternal", "AuditEventLog").where(
["Date>=cutoff_date", "Event=`SHUTTING_DOWN`"]
),
"ProcessInfoId",
)
)
memory_usage = (
merge(
[
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2").where(),
db.historical_table(
"DbInternal", "PersistentQueryConfigurationLogV2"
).where(),
]
)
.natural_join(
db.live_table("DbInternal", "PersistentQueryStateLog")
.where("Date>=cutoff_date")
.last_by(["SerialNumber", "VersionNumber"]),
["SerialNumber", "VersionNumber"],
["WorkerName", "ProcessInfoId"],
)
.last_by("ProcessInfoId")
)
# Filter out workers which probably stopped ungracefully
current_worker_info = current_workers.natural_join(
memory_usage, "ProcessInfoId", ["HeapSizeInGB", "Name", "ConfigurationType"]
).where("!(isNull(HeapSizeInGB))")
worker_heap_usage = (
current_worker_info.update("PurposeAndWorker = ConfigurationType + ` : ` + Name")
.sort_descending("HeapSizeInGB")
.rollup(sum_("HeapSizeInGB"), ["Host", "DispatcherName", "PurposeAndWorker"])
)