Query operation errors
Deephaven query operations can fail for many reasons. This guide highlights the most common errors and how to debug them.
Missing columns
Many query operations specify columns to operate on. where
, select
, update
, view
, and update_view
can use column names inside of a formula, and when column names are incorrect, you need to debug formula compilation errors. Many other query operations take formulas as a string. For example, this last_by
operation uses two columns - SerialNumber
and Version
:
pqcl = merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2"), db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")).lastBy("SerialNumber", "Version")
from deephaven import merge
pqcl = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2"),
]
).last_by(["SerialNumber", "Version"])
This script results in the following NoSuchColumnException
:
Error running script: io.deephaven.engine.table.impl.NoSuchColumnException: aggregation: not all group-by columns [SerialNumber, Version] are present in input table with columns [Date, Timestamp, SerialNumber, VersionNumber, Owner, Name, EventType, Enabled, HeapSizeInGB, AdditionalMemoryInGB, DataBufferPoolToHeapSizeRatio, DetailedGCLoggingEnabled, OmitDefaultGCParameters, DbServerName, RestartUsers, ScriptCode, ScriptPath, ScriptLanguage, ExtraJvmArguments, ExtraEnvironmentVariables, ClassPathAdditions, KubernetesControl, PythonControl, GenericWorkerControl, AdminGroups, ViewerGroups, ConfigurationType, Scheduling, Timeout, TypeSpecificFields, WorkerKind, JVMProfile, LastModifiedByAuthenticated, LastModifiedByEffective, LastModifiedTime, ReplicaCount, SpareCount, AssignmentPolicy, AssignmentPolicyParams]. Missing columns: [Version]
In this case, the column Version
used in the last_by
is actually named VersionNumber
, as shown in the list of columns in the input table (the result of our merge
operation). Here is the corrected code:
pqcl = merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2"), db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")).lastBy("SerialNumber", "VersionNumber")
from deephaven import merge
pqcl = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2"),
]
).last_by(["SerialNumber", "VersionNumber"])
Many other operations can have missing columns. For example, a natural join may have a missing column on the right-hand side:
pqcl = merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2"), db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")).lastBy("SerialNumber", "VersionNumber")
pqsl = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()")
withHeapSize = pqsl.naturalJoin(pqcl, "SerialNumber,VersionNumber", "HeapSizeInGb")
from deephaven import merge
pqcl = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2"),
]
).last_by(["SerialNumber", "VersionNumber"])
pqsl = db.live_table("DbInternal", "PersistentQueryStateLog").where("Date=today()")
with_heap_size = pqsl.natural_join(
pqcl, ["SerialNumber", "VersionNumber"], "HeapSizeInGb"
)
Column names are case-sensitive. This query produces the following error because the correct name is HeapSizeInGB
rather than HeapSizeGb
:
Error running script: io.deephaven.engine.table.impl.NoSuchColumnException: Unknown column names [HeapSizeInGb], available column names are [SerialNumber, VersionNumber, Date, Timestamp, Owner, Name, EventType, Enabled, HeapSizeInGB, AdditionalMemoryInGB, DataBufferPoolToHeapSizeRatio, DetailedGCLoggingEnabled, OmitDefaultGCParameters, DbServerName, RestartUsers, ScriptCode, ScriptPath, ScriptLanguage, ExtraJvmArguments, ExtraEnvironmentVariables, ClassPathAdditions, KubernetesControl, PythonControl, GenericWorkerControl, AdminGroups, ViewerGroups, ConfigurationType, Scheduling, Timeout, TypeSpecificFields, WorkerKind, JVMProfile, LastModifiedByAuthenticated, LastModifiedByEffective, LastModifiedTime, ReplicaCount, SpareCount, AssignmentPolicy, AssignmentPolicyParams]
The following query functions properly:
pqcl = merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2"), db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")).lastBy("SerialNumber", "VersionNumber")
pqsl = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()")
withHeapSize = pqsl.naturalJoin(pqcl, "SerialNumber,VersionNumber", "HeapSizeInGB")
from deephaven import merge
pqcl = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2"),
]
).last_by(["SerialNumber", "VersionNumber"])
pqsl = db.live_table("DbInternal", "PersistentQueryStateLog").where("Date=today()")
with_heap_size = pqsl.natural_join(
pqcl, ["SerialNumber", "VersionNumber"], "HeapSizeInGB"
)
Similarly, changing the natural join to have an additional match column (in this example, ProcessInfoId
) that is not present on the right-hand side produces the following error:
Error running script: io.deephaven.engine.table.impl.NoSuchColumnException: Unknown column names [ProcessInfoId], available column names are [SerialNumber, VersionNumber, Date, Timestamp, Owner, Name, EventType, Enabled, HeapSizeInGB, AdditionalMemoryInGB, DataBufferPoolToHeapSizeRatio, DetailedGCLoggingEnabled, OmitDefaultGCParameters, DbServerName, RestartUsers, ScriptCode, ScriptPath, ScriptLanguage, ExtraJvmArguments, ExtraEnvironmentVariables, ClassPathAdditions, KubernetesControl, PythonControl, GenericWorkerControl, AdminGroups, ViewerGroups, ConfigurationType, Scheduling, Timeout, TypeSpecificFields, WorkerKind, JVMProfile, LastModifiedByAuthenticated, LastModifiedByEffective, LastModifiedTime, ReplicaCount, SpareCount, AssignmentPolicy, AssignmentPolicyParams]
And conversely, adding ScriptLanguage
, which is not present on the left-hand side, errors as follows:
Error running script: io.deephaven.engine.table.impl.NoSuchColumnException: Unknown column names [ScriptLanguage], available column names are [Date, Owner, Name, Timestamp, Status, ControllerHost, DispatcherHost, ServerHost, WorkerName, ProcessInfoId, WorkerPort, LastAuthenticatedUser, LastEffectiveUser, SerialNumber, VersionNumber, TypeSpecificState, ExceptionMessage, ExceptionStackTrace, EngineVersion, WorkerKind, ScriptLoaderState, DispatcherPort, ReplicaSlot, StatusDetails]
Duplicate columns in Joins
The following query attempts to join tables with duplicate columns:
pqcl = merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2"), db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")).lastBy("SerialNumber", "VersionNumber")
pqsl = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()")
withHeapSize = pqsl.naturalJoin(pqcl, "SerialNumber,VersionNumber,ScriptLanguage")
from deephaven import merge
pqcl = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2"),
]
).last_by(["SerialNumber", "VersionNumber"])
pqsl = db.live_table("DbInternal", "PersistentQueryStateLog").where("Date=today()")
with_heap_size = pqsl.natural_join(pqcl, "SerialNumber,VersionNumber")
It produces the following message:
java.lang.IllegalStateException: Natural Join found duplicate right key for 1733244619742000074
Specify the columns you want:
pqcl = merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2"), db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")).lastBy("SerialNumber", "VersionNumber")
pqsl = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()")
withHeapSize = pqsl.naturalJoin(pqcl, "SerialNumber,VersionNumber", "HeapSizeInGB,Name")
from deephaven import merge
pqcl = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2"),
]
).last_by(["SerialNumber", "VersionNumber"])
pqsl = db.live_table("DbInternal", "PersistentQueryStateLog").where("Date=today()")
with_heap_size = pqsl.natural_join(
pqcl, "SerialNumber,VersionNumber", "HeapSizeInGB,Name"
)
However, this still produces an error:
RuntimeError: java.lang.RuntimeException: Conflicting column names [Name]
Rename the conflicting column:
pqcl = merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2"), db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")).lastBy("SerialNumber", "VersionNumber")
pqsl = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()")
withHeapSize = pqsl.naturalJoin(pqcl, "SerialNumber,VersionNumber", "HeapSizeInGB,ConfigName=Name")
from deephaven import merge
pqcl = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2"),
]
).last_by(["SerialNumber", "VersionNumber"])
pqsl = db.live_table("DbInternal", "PersistentQueryStateLog").where("Date=today()")
with_heap_size = pqsl.natural_join(
pqcl, "SerialNumber,VersionNumber", "HeapSizeInGB,ConfigName=Name"
)
Duplicate row keys
Natural join operations require that a unique right-hand side row exists for each key. If multiple rows exist, the Deephaven engine cannot unambiguously match a right-hand side row to the left-hand side row. If multiple outputs for each row are desired, then use a join
operation rather than a natural_join
.
In this example, we use natural join to augment the Persistent Query State log with the HeapSizeInGB
column from the Persistent Query configuration log using the SerialNumber
column as a key.
pqcl = merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2"), db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2"))
pqsl = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()")
withHeapSize = pqsl.naturalJoin(pqcl, "SerialNumber", "HeapSizeInGB")
pqcl = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2"),
]
)
pqsl = db.live_table("DbInternal", "PersistentQueryStateLog").where("Date=today()")
with_heap_size = pqsl.natural_join(pqcl, "SerialNumber", "HeapSizeInGB")
However, the SerialNumber
column does not define a unique key for a Persistent Query configuration. In this case, the query with a serial number 1733244619742000074 has more than one row in the PersistentQueryConfigurationLogV2
table.
java.lang.IllegalStateException: Natural Join found duplicate right key for 1733244619742000074
To correct this, we must add a second key column, in this case, VersionNumber
, to uniquely identify the version of the query that is referenced by the Persistent Query State log:
pqcl = merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2"), db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2"))
pqsl = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()")
withHeapSize = pqsl.naturalJoin(pqcl, "SerialNumber,VersionNumber", "HeapSizeInGB")
from deephaven import merge
pqcl = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2"),
]
)
pqsl = db.live_table("DbInternal", "PersistentQueryStateLog").where("Date=today()")
with_heap_size = pqsl.natural_join(pqcl, "SerialNumber,VersionNumber", "HeapSizeInGB")
This query correctly maps the state to a single version of the Persistent Query configuration, but there are still cases where multiple rows can appear, resulting in a similar error but indicating the version number in the duplicate key:
java.lang.IllegalStateException: Natural Join found duplicate right key for [1733244619742000074, 5]
In particular, when the Persistent Query Controller restarts it logs all active configurations. To correct this problem, select only the last row from the PersistentQueryConfigurationLogV2 table to retrieve the most recent entry:
pqcl = merge(db.historicalTable("DbInternal", "PersistentQueryConfigurationLogV2"), db.liveTable("DbInternal", "PersistentQueryConfigurationLogV2")).lastBy("SerialNumber", "VersionNumber")
pqsl = db.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()")
withHeapSize = pqsl.naturalJoin(pqcl, "SerialNumber,VersionNumber", "HeapSizeInGB")
from deephaven import merge
pqcl = merge(
[
db.historical_table("DbInternal", "PersistentQueryConfigurationLogV2"),
db.live_table("DbInternal", "PersistentQueryConfigurationLogV2"),
]
).last_by(["SerialNumber", "VersionNumber"])
pqsl = db.live_table("DbInternal", "PersistentQueryStateLog").where("Date=today()")
with_heap_size = pqsl.natural_join(pqcl, "SerialNumber,VersionNumber", "HeapSizeInGB")
Every data set and query is different, therefore adding a last aggregation is not a universal solution. Carefully consider your data and necessary business logic to ensure that you choose the correct data.
Resource limitations
Sometimes, a query operation exceeds the resources available. Three kinds of resource limitations are memory, key cardinality, and row set address space.
Out of memory
Each Persistent Query or Code Studio is assigned to a JVM that has limited memory available for heap space (where most Java objects are stored) and direct memory (where I/O buffers are often stored). Beyond the JVM's managed memory, when using Python, the Python interpreter also uses memory for its own objects. The JVM can report that heap is exhausted, which manifests itself as an OutOfMemoryError
.
Using a worker with a 1GB heap, the following snippet creates a 150,000,000 row table and uses the select
method to create the column X
, whose type is long
. Since long
values are each 8 bytes, this select
operation requires 1.2GB of heap storage to store the column.
x=emptyTable(100_000_000).select("X=ii")
from deephaven import empty_table
x = empty_table(150_000_000).select("X=ii")
In this particular instance, the Code Studio crashed due to resource exhaustion, so the Exception text must be retrieved from the ProcessEventLog.
Initiating shutdown due to: {SelectColumnLayer: X=ii, layerIndex=0} Error
java.lang.OutOfMemoryError: Java heap space
at io.deephaven.engine.table.impl.sources.immutable.ImmutableLongArraySource.allocateArray(ImmutableLongArraySource.java:73)
at io.deephaven.engine.table.impl.sources.immutable.ImmutableLongArraySource.ensureCapacity(ImmutableLongArraySource.java:112)
at io.deephaven.engine.table.impl.select.analyzers.SelectColumnLayer.doEnsureCapacity(SelectColumnLayer.java:606)
at io.deephaven.engine.table.impl.select.analyzers.SelectColumnLayer.prepareParallelUpdate(SelectColumnLayer.java:263)
at io.deephaven.engine.table.impl.select.analyzers.SelectColumnLayer.lambda$createUpdateHandler$1(SelectColumnLayer.java:238)
at io.deephaven.engine.table.impl.select.analyzers.SelectColumnLayer$$Lambda$1745/0x00007f72b4d6ae70.run(Unknown Source)
at io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler.lambda$submit$0(OperationInitializerJobScheduler.java:40)
at io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler$$Lambda$1746/0x00007f72b4d6b098.run(Unknown Source)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at io.deephaven.engine.context.ExecutionContext.lambda$apply$0(ExecutionContext.java:196)
at io.deephaven.engine.context.ExecutionContext$$Lambda$629/0x00007f72b4713bc0.get(Unknown Source)
at io.deephaven.engine.context.ExecutionContext.apply(ExecutionContext.java:207)
at io.deephaven.engine.context.ExecutionContext.apply(ExecutionContext.java:195)
at io.deephaven.engine.table.impl.OperationInitializationThreadPool$1.lambda$newThread$0(OperationInitializationThreadPool.java:45)
at io.deephaven.engine.table.impl.OperationInitializationThreadPool$1$$Lambda$564/0x00007f72b46d5440.run(Unknown Source)
at java.base/java.lang.Thread.run(Thread.java:840)
A Java heap exception can also be generated from a Groovy console entirely from script code, without using any Deephaven operations. For example:
bigArray = new long[150_000_000]
This produces the following exception:
Initiating shutdown due to: Exception while processing serialExecutor task
java.lang.OutOfMemoryError: Java heap space
at io.deephaven.dynamic.Script_7.run(Script_7.groovy:2)
at groovy.lang.GroovyShell.evaluate(GroovyShell.java:427)
at groovy.lang.GroovyShell.evaluate(GroovyShell.java:461)
at groovy.lang.GroovyShell.evaluate(GroovyShell.java:436)
at io.deephaven.engine.util.GroovyDeephavenSession.lambda$evaluate$0(GroovyDeephavenSession.java:352)
at io.deephaven.engine.util.GroovyDeephavenSession$$Lambda$1722/0x00007f92bcc418d0.run(Unknown Source)
at io.deephaven.util.locks.FunctionalLock.doLockedInterruptibly(FunctionalLock.java:51)
at io.deephaven.engine.util.GroovyDeephavenSession.evaluate(GroovyDeephavenSession.java:352)
at io.deephaven.engine.util.AbstractScriptSession.lambda$evaluateScript$0(AbstractScriptSession.java:165)
at io.deephaven.engine.util.AbstractScriptSession$$Lambda$1080/0x00007f92bc9cb2d8.run(Unknown Source)
at io.deephaven.engine.context.ExecutionContext.lambda$apply$0(ExecutionContext.java:196)
at io.deephaven.engine.context.ExecutionContext$$Lambda$626/0x00007f92bc70eaf8.get(Unknown Source)
at io.deephaven.engine.context.ExecutionContext.apply(ExecutionContext.java:207)
at io.deephaven.engine.context.ExecutionContext.apply(ExecutionContext.java:195)
at io.deephaven.engine.util.AbstractScriptSession.evaluateScript(AbstractScriptSession.java:165)
at io.deephaven.enterprise.dnd.modules.GroovyConsoleSessionWithDatabaseModule$ScriptSessionWrapper.evaluateScript(GroovyConsoleSessionWithDatabaseModule.java:117)
at io.deephaven.engine.util.DelegatingScriptSession.evaluateScript(DelegatingScriptSession.java:72)
at io.deephaven.engine.util.ScriptSession.evaluateScript(ScriptSession.java:75)
at io.deephaven.server.console.ConsoleServiceGrpcImpl.lambda$executeCommand$4(ConsoleServiceGrpcImpl.java:193)
at io.deephaven.server.console.ConsoleServiceGrpcImpl$$Lambda$1984/0x00007f92bcdbfcb0.run(Unknown Source)
at io.deephaven.server.session.SessionState$ExportBuilder.lambda$submit$2(SessionState.java:1537)
at io.deephaven.server.session.SessionState$ExportBuilder$$Lambda$1943/0x00007f92bcd913a8.call(Unknown Source)
at io.deephaven.server.session.SessionState$ExportObject.doExport(SessionState.java:995)
at io.deephaven.server.session.SessionState$ExportObject$$Lambda$1985/0x00007f92bcdba000.run(Unknown Source)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at io.deephaven.server.runner.scheduler.SchedulerModule$ThreadFactory.lambda$newThread$0(SchedulerModule.java:100)
at io.deephaven.server.runner.scheduler.SchedulerModule$ThreadFactory$$Lambda$1758/0x00007f92bcc654d0.run(Unknown Source)
at java.base/java.lang.Thread.run(Thread.java:840)
Python memory allocations are not bound by the JVM heap size, but you may still run into system limits.
For example:
import numpy
x = numpy.zeros(15_000_000_000)
This results in the following Python error:
r-Scheduler-Serial-1 | .c.ConsoleServiceGrpcImpl | Error running script: java.lang.RuntimeException: Error in Python interpreter:
Type: <class 'numpy._core._exceptions._ArrayMemoryError'>
Value: Unable to allocate 112. GiB for an array with shape (15000000000,) and data type float64
Line: 2
Namespace: <module>
File: <string>
Traceback (most recent call last):
File "<string>", line 2, in <module>
at org.jpy.PyLib.executeCode(Native Method)
at org.jpy.PyObject.executeCode(PyObject.java:138)
at io.deephaven.engine.util.PythonEvaluatorJpy.evalScript(PythonEvaluatorJpy.java:73)
at io.deephaven.integrations.python.PythonDeephavenSession.lambda$evaluate$1(PythonDeephavenSession.java:205)
at io.deephaven.util.locks.FunctionalLock.doLockedInterruptibly(FunctionalLock.java:51)
at io.deephaven.integrations.python.PythonDeephavenSession.evaluate(PythonDeephavenSession.java:205)
at io.deephaven.engine.util.AbstractScriptSession.lambda$evaluateScript$0(AbstractScriptSession.java:165)
at io.deephaven.engine.context.ExecutionContext.lambda$apply$0(ExecutionContext.java:196)
at io.deephaven.engine.context.ExecutionContext.apply(ExecutionContext.java:207)
at io.deephaven.engine.context.ExecutionContext.apply(ExecutionContext.java:195)
at io.deephaven.engine.util.AbstractScriptSession.evaluateScript(AbstractScriptSession.java:165)
at io.deephaven.engine.util.DelegatingScriptSession.evaluateScript(DelegatingScriptSession.java:72)
at io.deephaven.engine.util.ScriptSession.evaluateScript(ScriptSession.java:75)
at io.deephaven.server.console.ConsoleServiceGrpcImpl.lambda$executeCommand$4(ConsoleServiceGrpcImpl.java:193)
at io.deephaven.server.session.SessionState$ExportBuilder.lambda$submit$2(SessionState.java:1537)
at io.deephaven.server.session.SessionState$ExportObject.doExport(SessionState.java:995)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at io.deephaven.server.runner.scheduler.SchedulerModule$ThreadFactory.lambda$newThread$0(SchedulerModule.java:100)
at java.base/java.lang.Thread.run(Thread.java:840)
You may not always receive such a clear error message. If the available machine memory is exceeded, the Linux kernel may terminate your worker with an "OOM". On Kubernetes, the runtime terminates pods that exceed their memory limits, and you may need to adjust your additional memory settings.
Join or aggregation cardinality
Deephaven uses hash tables to bucket values in a join or aggregation, which have a maximum size of roughly 750,000,000 million rows. When the cardinality of the hash keys is greater than the maximum size, an error is thrown. For example, this query has a key cardinality of one billion:
x=emptyTable(1_000_000_000).updateView("X=ii")
y=x.countBy("Count", "X")
from deephaven import empty_table
x = empty_table(1_000_000_000).update_view("X=ii")
y = x.count_by("Count", "X")
This results in the following exception:
r-Scheduler-Serial-1 | .c.ConsoleServiceGrpcImpl | Error running script: java.lang.UnsupportedOperationException: Hash table exceeds maximum size!
at io.deephaven.engine.table.impl.by.OperatorAggregationStateManagerOpenAddressedBase.doRehash(OperatorAggregationStateManagerOpenAddressedBase.java:92)
at io.deephaven.engine.table.impl.by.OperatorAggregationStateManagerOpenAddressedBase.buildTable(OperatorAggregationStateManagerOpenAddressedBase.java:76)
at io.deephaven.engine.table.impl.by.StaticChunkedOperatorAggregationStateManagerOpenAddressedBase.add(StaticChunkedOperatorAggregationStateManagerOpenAddressedBase.java:56)
at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper.initialBucketedKeyAddition(ChunkedOperatorAggregationHelper.java:1877)
at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper.aggregation(ChunkedOperatorAggregationHelper.java:200)
at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper.lambda$aggregation$2(ChunkedOperatorAggregationHelper.java:130)
at io.deephaven.engine.table.impl.BaseTable.initializeWithSnapshot(BaseTable.java:1293)
at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper.lambda$aggregation$3(ChunkedOperatorAggregationHelper.java:127)
at io.deephaven.engine.liveness.LivenessScopeStack.computeEnclosed(LivenessScopeStack.java:179)
at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper.aggregation(ChunkedOperatorAggregationHelper.java:115)
at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper.aggregation(ChunkedOperatorAggregationHelper.java:70)
at io.deephaven.engine.table.impl.QueryTable.lambda$aggNoMemo$18(QueryTable.java:855)
at io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder.withNugget(QueryPerformanceRecorder.java:369)
at io.deephaven.engine.table.impl.QueryTable.aggNoMemo(QueryTable.java:854)
at io.deephaven.engine.table.impl.QueryTable.lambda$aggBy$17(QueryTable.java:830)
at io.deephaven.engine.table.impl.QueryTable$MemoizedResult.getOrCompute(QueryTable.java:3681)
at io.deephaven.engine.table.impl.QueryTable.memoizeResult(QueryTable.java:3650)
at io.deephaven.engine.table.impl.QueryTable.aggBy(QueryTable.java:830)
at io.deephaven.engine.table.impl.QueryTable.aggBy(QueryTable.java:98)
at io.deephaven.api.TableOperationsDefaults.aggBy(TableOperationsDefaults.java:337)
at io.deephaven.api.TableOperationsDefaults.aggBy(TableOperationsDefaults.java:324)
at io.deephaven.api.TableOperationsDefaults.countBy(TableOperationsDefaults.java:412)
at io.deephaven.api.TableOperationsDefaults.countBy(TableOperationsDefaults.java:406)
You can work around this limitation by using partitioned tables. A partitioned table divides an input table into several sub-tables; in this case, we divide the billion-row table into smaller tables with only 100 million keys each. Often these divisions are done using a hash code and modulus operator. After dividing the table, we create a proxy, execute the original count, and then merge the results. When dividing the table, it is essential that each division contains all of the rows for your aggregation or join keys.
x=emptyTable(1_000_000_000).updateView("X=ii")
xb=x.updateView("Bucket=(int)(X/100_000_00)").partitionBy("Bucket")
y=xb.proxy().countBy("Count", "X").target.merge()
from deephaven import empty_table
x = empty_table(1_000_000_000).update_view("X=ii")
xb = x.update_view("Bucket=(int)(X/100_000_00)").partition_by("Bucket")
y = xb.proxy().count_by("Count", "X").target.merge()
Row set address space
The Deephaven engine represents tables as a set of column sources and a rowset that indicates what addresses in each column source are valid. This enables the engine to efficiently handle tables that update. The rowset contains row keys that are represented as a signed 64-bit number, leaving 63-bits for representing row keys. Some Deephaven operations subdivide those 63-bits into regions. For example, a table returned from db.live_table
or db.historical_table
uses the top 23 bits for the partition and the bottom 40 bits for the row within a partition. This division allows the engine to very quickly identify the correct partition and row when reading data. The ungroup
and join
operations make similar use of the row set address space to efficiently access rows.
The following example uses a series of group_by
and ungroup
operations to create a Table with a large address space. At each intermediate step, it calls a function to print out the current size of the table, and the last row key with the number of bits consumed.
showLastRowKey = { s, t ->
lastRowKey = t.getRowSet().lastRowKey()
println(String.format("%s: %,d rows, last row key is %,d or 0x%x (%d bits)", s, t.size(), lastRowKey, lastRowKey, 64 - Long.numberOfLeadingZeros(lastRowKey)))
}
source = emptyTable(50_000_000).updateView("Bucket=Integer.highestOneBit(i)", "Value=i")
showLastRowKey("source", source)
with_bucket=source.groupBy("Bucket").update("Len=Value.size()")
showLastRowKey("with_bucket", with_bucket)
by_len=with_bucket.groupBy("Len")
showLastRowKey("by_len", by_len)
ungroup_by_len=by_len.ungroup()
showLastRowKey("ungroup_by_len", ungroup_by_len)
ungrouped = ungroup_by_len.ungroup()
showLastRowKey("ungrouped", ungrouped)
joined = ungrouped.join(ungrouped, "Bucket", "Right=Value")
showLastRowKey("joined", joined)
from deephaven import empty_table
def show_last_row_key(s, t):
last_row_key = t.j_table.getRowSet().lastRowKey()
print(
"{}: {:,d} rows, last row key is {:,d} or 0x{:_x} ({:d} bits)".format(
s, t.size, last_row_key, last_row_key, last_row_key.bit_length()
)
)
source = empty_table(50_000_000).update_view(
["Bucket=Integer.highestOneBit(i)", "Value=i"]
)
show_last_row_key("source", source)
with_bucket = source.group_by("Bucket").update("Len=Value.size()")
show_last_row_key("with_bucket", with_bucket)
by_len = with_bucket.group_by("Len")
show_last_row_key("by_len", by_len)
ungroup_by_len = by_len.ungroup()
show_last_row_key("ungroup_by_len", ungroup_by_len)
ungrouped = ungroup_by_len.ungroup()
show_last_row_key("ungrouped", ungrouped)
joined = ungrouped.join(ungrouped, "Bucket", "Right=Value")
show_last_row_key("joined", joined)
The number of bits in the intermediate tables address space is as follows:
source: 50,000,000 rows, last row key is 49,999,999 or 0x2fa_f07f (26 bits)
with_bucket: 27 rows, last row key is 26 or 0x1a (5 bits)
by_len: 26 rows, last row key is 25 or 0x19 (5 bits)
ungroup_by_len: 27 rows, last row key is 25,600 or 0x6400 (15 bits)
ungrouped: 50,000,000 rows, last row key is 859,009,904,767 or 0xc8_00fa_f07f (40 bits)
The ungrouped table contains only 50,000,000 rows, but uses 40-bits of address space. During the execution of the join
, there are too many row keys to efficiently represent the result, therefore the engine produces the following exception message:
io.deephaven.engine.exceptions.OutOfKeySpaceException: join out of rowSet space (left reqBits + right reqBits > 63): (left table: {size: 50000000 maxRowKey: 859009904767 reqBits: 40}) X (right table: {maxRowKey: 16777215 reqBits: 24}) exceeds Long.MAX_VALUE. Consider flattening left table if possible.
Insert the flatten
of the left side as suggested in the exception message as follows:
showLastRowKey = { s, t ->
lastRowKey = t.getRowSet().lastRowKey()
println(String.format("%s: %,d rows, last row key is %,d or 0x%x (%d bits)", s, t.size(), lastRowKey, lastRowKey, 64 - Long.numberOfLeadingZeros(lastRowKey)))
}
source = emptyTable(50_000_000).updateView("Bucket=Integer.highestOneBit(i)", "Value=i")
showLastRowKey("source", source)
with_bucket=source.groupBy("Bucket").update("Len=Value.size()")
showLastRowKey("with_bucket", with_bucket)
by_len=with_bucket.groupBy("Len")
showLastRowKey("by_len", by_len)
ungroup_by_len=by_len.ungroup()
showLastRowKey("ungroup_by_len", ungroup_by_len)
ungrouped = ungroup_by_len.ungroup()
showLastRowKey("ungrouped", ungrouped)
flattened = ungrouped.flatten()
showLastRowKey("flattened", flattened)
joined = flattened.join(flattened, "Bucket", "Right=Value")
showLastRowKey("joined", joined)
from deephaven import empty_table
def show_last_row_key(s, t):
last_row_key = t.j_table.getRowSet().lastRowKey()
print(
"{}: {:,d} rows, last row key is {:,d} or 0x{:_x} ({:d} bits)".format(
s, t.size, last_row_key, last_row_key, last_row_key.bit_length()
)
)
source = empty_table(50_000_000).update_view(
["Bucket=Integer.highestOneBit(i)", "Value=i"]
)
show_last_row_key("source", source)
with_bucket = source.group_by("Bucket").update("Len=Value.size()")
show_last_row_key("with_bucket", with_bucket)
by_len = with_bucket.group_by("Len")
show_last_row_key("by_len", by_len)
ungroup_by_len = by_len.ungroup()
show_last_row_key("ungroup_by_len", ungroup_by_len)
ungrouped = ungroup_by_len.ungroup()
show_last_row_key("ungrouped", ungrouped)
flattened = ungrouped.flatten()
show_last_row_key("flattened", flattened)
joined = flattened.join(flattened, "Bucket", "Right=Value")
show_last_row_key("joined", joined)
The introduction of the flatten
operation reduces the 40 bits of address space used by the ungrouped
table to only 26 bits used by the flatten
table. The join
operation can efficiently represent the result.
source: 50,000,000 rows, last row key is 49,999,999 or 0x2fa_f07f (26 bits)
with_bucket: 27 rows, last row key is 26 or 0x1a (5 bits)
by_len: 26 rows, last row key is 25 or 0x19 (5 bits)
ungroup_by_len: 27 rows, last row key is 25,600 or 0x6400 (15 bits)
ungrouped: 50,000,000 rows, last row key is 859,009,904,767 or 0xc8_00fa_f07f (40 bits)
flattened: 50,000,000 rows, last row key is 49,999,999 or 0x2fa_f07f (26 bits)
joined: 645,756,675,790,166 rows, last row key is 838,860,799,668,351 or 0x2_faf0_7ffa_f07f (50 bits)
Engine assertions
At some points while executing a query, the Deephaven engine can detect invalid internal state. In these cases, the engine produces an AssertionFailure
exception. For example, when an update cannot both add and modify the same row key, or the number of rows in an aggregation state cannot be less than zero. An assertion failure indicates one of two conditions:
- The query violates the assumptions made by the Deephaven engine. In particular, queries may not introduce changes into the system without the knowledge of the Deephaven Update Graph. Common examples of changes that are unknown to the engine include unstable formulas or using array references in a ticking table. An unstable formula may not return the same value for the same inputs (i.e., it is not a pure function). Never use an unstable function, like
now()
, in anupdate_view
. - There is a bug in the Deephaven engine itself.
The following query incorrectly uses an unstable formula, which causes undefined behavior in Deephaven's engine.
random = new Random(0)
y=timeTable("PT1s").update("JoinKey=i % 10").updateView("Unstable=random.nextInt(10)").lastBy("JoinKey")
x=timeTable("PT1s").updateView("Bucket = (int)(i/10)", "JoinKey=i % 10")
z=x.naturalJoin(y, "JoinKey", "Unstable")
counted=z.countBy("Count", "Unstable")
from deephaven import time_table
import jpy
j_random = jpy.get_type("java.util.Random")
random = j_random(0)
y = (
time_table("PT1s")
.update("JoinKey=i % 10")
.update_view("Unstable=random.nextInt(10)")
.last_by("JoinKey")
)
x = time_table("PT1s").update_view(["Bucket = (int)(i/10)", "JoinKey=i % 10"])
z = x.natural_join(y, "JoinKey", "Unstable")
counted = z.count_by("Count", "Unstable")
After running for some period of time, the query fails with an assertion like the following:
Uncaught exception for entry= by(NORMAL:[Count{column=ColumnName(Count)}], [Unstable]), added.size()=1, modified.size()=1, removed.size()=0, shifted.size()=0, modifiedColumnSet={Unstable}:
java.lang.IllegalStateException: Missing value in probe
at io.deephaven.engine.table.impl.by.typed.incopenagg.gen.IncrementalAggOpenHasherInt.probe(IncrementalAggOpenHasherInt.java:126)
at io.deephaven.engine.table.impl.by.IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.probeTable(IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.java:197)
at io.deephaven.engine.table.impl.by.IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.findModifications(IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.java:427)
at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper$KeyedUpdateContext.doInsertsForChunk(ChunkedOperatorAggregationHelper.java:755)
at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper$KeyedUpdateContext.doInserts(ChunkedOperatorAggregationHelper.java:743)
at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper$KeyedUpdateContext.computeDownstreamIndicesAndCopyKeys(ChunkedOperatorAggregationHelper.java:613)
at io.deephaven.engine.table.impl.by.ChunkedOperatorAggregationHelper$1.onUpdate(ChunkedOperatorAggregationHelper.java:274)
at io.deephaven.engine.table.impl.InstrumentedTableUpdateListener$Notification.lambda$run$0(InstrumentedTableUpdateListener.java:37)
at io.deephaven.engine.table.impl.InstrumentedTableListenerBase$NotificationBase.doRunInternal(InstrumentedTableListenerBase.java:334)
at io.deephaven.engine.table.impl.InstrumentedTableListenerBase$NotificationBase.doRun(InstrumentedTableListenerBase.java:316)
at io.deephaven.engine.table.impl.InstrumentedTableUpdateListener$Notification.run(InstrumentedTableUpdateListener.java:37)
at io.deephaven.engine.updategraph.impl.BaseUpdateGraph.runNotification(BaseUpdateGraph.java:711)
at io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$ConcurrentNotificationProcessor.processSatisfiedNotifications(PeriodicUpdateGraph.java:836)
at io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$NotificationProcessorThreadFactory.lambda$newThread$0(PeriodicUpdateGraph.java:1109)
at java.base/java.lang.Thread.run(Thread.java:840)
The "Missing value in probe" indicates that while processing an aggregation the engine could not find an expected state.
As is often the case in these kinds of problems, the operation that caused the error (here, the incorrect use of update_view
to generate a random number) is not the same operation that detected the error (the count_by
bucketed on the Unstable
column). Indeed, many errors may not be detected at all or only by an operation very far downstream from the offending error (whether that error is due to the user's query violating engine assumptions or a bug in the Deephaven engine itself). This makes this kind of failure among the most difficult type of error to debug.
There are some tools available that may help in tracking down the root cause of an assertion failure:
- The
TableUpdateValidator
is a tool that can narrow down which operation is at fault for a failure. TheTableUpdateValidator
can be inserted into a sequence of operations and validates that the update from an operation correctly notifies the downstream operation of all changes. This can be used to detect unstable formulas or engine bugs that result in erroneous previous values. - Attaching a debugger to the query and setting a breakpoint on an
AssertionFailure
. This requires that the assertion is reliably reproducible and knowledge of the engine internals and Java debugging. - Generating a heap dump on assertion failures, and analyzing the result using JVisualVM or other tools. Analyzing a heap dump requires the same knowledge of engine internals and Java debugging and provides less information, but this can be enabled more easily when the failure recurs only occasionally.
Use a TableUpdateValidator
To narrow down our problem, you can use a TableUpdateValidator
. From Groovy, use the TableUpdateValidator
directly. From Python, use the jpy.get_type
method to access the underlying Java class and then re-wrap the result in a deephaven.table.Table.
import io.deephaven.engine.table.impl.TableUpdateValidator;
random = new Random(0)
y=timeTable("PT1s").update("JoinKey=i % 10").updateView("Unstable=random.nextInt(10)")
yv = TableUpdateValidator.make("y", y).getResultTable().lastBy("JoinKey")
x=timeTable("PT1s").updateView("Bucket = (int)(i/10)", "JoinKey=i % 10")
xv = TableUpdateValidator.make("x", x).getResultTable()
z=xv.naturalJoin(yv, "JoinKey", "Unstable")
zv = TableUpdateValidator.make("z", z).getResultTable()
counted=zv.countBy("Count", "Unstable")
from deephaven import time_table
from deephaven.table import Table
import jpy
j_random = jpy.get_type("java.util.Random")
tuv = jpy.get_type("io.deephaven.engine.table.impl.TableUpdateValidator")
random = j_random(0)
y = (
time_table("PT1s")
.update("JoinKey=i % 10")
.update_view("Unstable=random.nextInt(10)")
)
yv = Table(tuv.make("y", y.j_table).getResultTable()).last_by("JoinKey")
x = time_table("PT1s").update_view(["Bucket = (int)(i/10)", "JoinKey=i % 10"])
xv = Table(tuv.make("x", x.j_table).getResultTable())
z = xv.natural_join(yv, "JoinKey", "Unstable")
zv = Table(tuv.make("z", z.j_table).getResultTable())
counted = zv.count_by("Count", "Unstable")
In this case, the update produced the following error:
java.lang.RuntimeException: Table to validate UpdateValidator(z) generated an erroneous update:
- pre-update modified (previous) columnName=Unstable k=0 (from source) expected=8 actual=7 (from chunk) expected=8 actual=5
at io.deephaven.engine.table.impl.TableUpdateValidator.onUpdate(TableUpdateValidator.java:217)
at io.deephaven.engine.table.impl.TableUpdateValidator$1.onUpdate(TableUpdateValidator.java:117)
at io.deephaven.engine.table.impl.InstrumentedTableUpdateListener$Notification.lambda$run$0(InstrumentedTableUpdateListener.java:37)
at io.deephaven.engine.table.impl.InstrumentedTableListenerBase$NotificationBase.doRunInternal(InstrumentedTableListenerBase.java:334)
at io.deephaven.engine.table.impl.InstrumentedTableListenerBase$NotificationBase.doRun(InstrumentedTableListenerBase.java:316)
at io.deephaven.engine.table.impl.InstrumentedTableUpdateListener$Notification.run(InstrumentedTableUpdateListener.java:37)
at io.deephaven.engine.updategraph.impl.BaseUpdateGraph.runNotification(BaseUpdateGraph.java:711)
at io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$ConcurrentNotificationProcessor.processSatisfiedNotifications(PeriodicUpdateGraph.java:836)
at io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$NotificationProcessorThreadFactory.lambda$newThread$0(PeriodicUpdateGraph.java:1109)
at java.base/java.lang.Thread.run(Thread.java:840)
This exonerates the count method, indicating that the table z
is producing inconsistent values for the Unstable
column. Although this shows that z
is producing incorrect results, y
has not actually been identified as the root cause. Setting the JVM argument -DTableUpdateValidator.aggressiveUpdateValidation=true
and re-running the query produces the following exception:
aph-updateExecutor-2 | i.d.s.s.SessionService | Internal Error '9a75b9d5-2dec-4ed6-a82c-f7b22aea51fd' java.lang.RuntimeException: Table to validate UpdateValidator(y) generated an erroneous update:
- pre-update (previous) columnName=Unstable k=0 (from source) expected=0 actual=5 (from chunk) expected=0 actual=7
- post-shift unmodified columnName=Unstable k=0 (from source) expected=0 actual=1 (from chunk) expected=0 actual=3
at io.deephaven.engine.table.impl.TableUpdateValidator.onUpdate(TableUpdateValidator.java:217)
at io.deephaven.engine.table.impl.TableUpdateValidator$1.onUpdate(TableUpdateValidator.java:117)
at io.deephaven.engine.table.impl.InstrumentedTableUpdateListener$Notification.lambda$run$0(InstrumentedTableUpdateListener.java:37)
at io.deephaven.engine.table.impl.InstrumentedTableListenerBase$NotificationBase.doRunInternal(InstrumentedTableListenerBase.java:334)
at io.deephaven.engine.table.impl.InstrumentedTableListenerBase$NotificationBase.doRun(InstrumentedTableListenerBase.java:316)
at io.deephaven.engine.table.impl.InstrumentedTableUpdateListener$Notification.run(InstrumentedTableUpdateListener.java:37)
at io.deephaven.engine.updategraph.impl.BaseUpdateGraph.runNotification(BaseUpdateGraph.java:711)
at io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$ConcurrentNotificationProcessor.processSatisfiedNotifications(PeriodicUpdateGraph.java:836)
at io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph$NotificationProcessorThreadFactory.lambda$newThread$0(PeriodicUpdateGraph.java:1109)
at java.base/java.lang.Thread.run(Thread.java:840)
It is now clear that the update_view
operation in y
produces incorrect results for the Unstable
column. Table Update Validators are useful tools, but they can greatly affect a query's performance and memory usage. It records all values from the table that it is validating in memory and then compares those values with updated results. This impacts both the query's heap utilization and the update cycle's CPU utilization, so a Table Update Validator should be used judiciously.