Class PeriodicUpdateGraph
java.lang.Object
io.deephaven.engine.updategraph.impl.BaseUpdateGraph
io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph
- All Implemented Interfaces:
LogOutputAppendable
,NotificationQueue
,NotificationQueue.Dependency
,UpdateGraph
,UpdateSourceRegistrar
This class uses a thread (or pool of threads) to periodically update a set of monitored update sources at a specified
target cycle interval. The target cycle interval can be configured
to
reduce or increase the run rate of the monitored sources.
This class can be configured via the following Configuration
property
- "PeriodicUpdateGraph.targetCycleDurationMillis"(optional) - The default target cycle time in ms (1000 if not defined)
-
Nested Class Summary
Nested classes/interfaces inherited from class io.deephaven.engine.updategraph.impl.BaseUpdateGraph
BaseUpdateGraph.AccumulatedCycleStats
Nested classes/interfaces inherited from interface io.deephaven.engine.updategraph.NotificationQueue
NotificationQueue.Dependency, NotificationQueue.ErrorNotification, NotificationQueue.Notification
-
Field Summary
Fields inherited from class io.deephaven.engine.updategraph.impl.BaseUpdateGraph
accumulatedCycleStats, DEFAULT_MINIMUM_CYCLE_DURATION_TO_LOG_NANOSECONDS, DEFAULT_UPDATE_GRAPH_NAME, MINIMUM_CYCLE_DURATION_TO_LOG_MILLIS_PROP
-
Constructor Summary
ConstructorDescriptionPeriodicUpdateGraph
(String name, boolean allowUnitTestMode, long targetCycleDurationMillis, long minimumCycleDurationToLogNanos, int numUpdateThreads, ThreadInitializationFactory threadInitializationFactory, OperationInitializer operationInitializer) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addNotification
(@NotNull NotificationQueue.Notification notification) Enqueue a notification to be flushed according to its priority.void
Add a table to the list of tables to run and mark it asrefreshing
if it was aDynamicNode
.void
Do the second half of the update cycle, including flushing notifications, and completing theLogicalClock
update cycle.void
Enable unit test mode.void
Flush all the normal notifications from the UpdateGraph queue.flushAllNormalNotificationsForUnitTests
(@NotNull BooleanSupplier done, long timeoutMillis) Flush all the normal notifications from the UpdateGraph queue, continuing untildone
returnstrue
.boolean
Flush a single notification from the UpdateGraph queue.boolean
flushOneNotificationForUnitTests
(boolean expectOnlyUnsatisfiedNotifications) Flush a single notification from the UpdateGraph queue.static PeriodicUpdateGraph
getInstance
(String name) long
Get the target duration of an update cycle, including the updating phase and the idle phase.int
Get the current watchdogtimeout
value.boolean
isCycleOnBudget
(long cycleTimeNanos) Is the provided cycle time on budget?boolean
void
Record that sources have been satisfied within a unit test cycle.boolean
maybeAddNotification
(@NotNull NotificationQueue.Notification notification, long deliveryStep) Add a notification for this NotificationQueue to deliver (by invoking its run() method), iff the delivery step is the current step and the update cycle for that step is still in process.static PeriodicUpdateGraph.Builder
newBuilder
(String name) int
Retrieve the number of update threads.void
refreshUpdateSourceForUnitTests
(@NotNull Runnable updateSource) Refresh an update source on a simulated UpdateGraph run thread, rather than this thread.void
Request that the next update cycle begin as soon as practicable.void
resetForUnitTests
(boolean after) Clear all monitored tables and enqueued notifications to supportunit-tests
.void
resetForUnitTests
(boolean after, boolean randomizedNotifications, int seed, int maxRandomizedThreadCount, int notificationStartDelay, int notificationAdditionDelay) Clear all monitored tables and enqueued notifications to supportunit-tests
.void
Resets the run cycle time to the default target configured via thePeriodicUpdateGraph.Builder
setting.<T extends Exception>
voidrunWithinUnitTestCycle
(@NotNull ThrowingRunnable<T> runnable) Execute the given runnable wrapped withstartCycleForUnitTests()
andcompleteCycleForUnitTests()
.<T extends Exception>
voidrunWithinUnitTestCycle
(@NotNull ThrowingRunnable<T> runnable, boolean sourcesSatisfied) Execute the given runnable wrapped withstartCycleForUnitTests()
andcompleteCycleForUnitTests()
.void
setTargetCycleDurationMillis
(long targetCycleDurationMillis) Set the target duration of an update cycle, including the updating phase and the idle phase.void
setWatchDogMillis
(int watchDogMillis) Enable the loop watchdog with the specified timeout.void
setWatchDogTimeoutProcedure
(LongConsumer procedure) Set the procedure to be called when the watchdogtimes out
.void
start()
Install a real NotificationProcessor and start the primary refresh thread.void
Begin the nextupdate cycle
while inunit-test
mode.void
startCycleForUnitTests
(boolean sourcesSatisfied) Begin the nextupdate cycle
while inunit-test
mode.void
stop()
Begins the process to stop all processing threads and forces ReferenceCounted sources to a reference count of zero.void
If the run thread is waiting in flushNormalNotificationsAndCompleteCycle() orflushAllNormalNotificationsForUnitTests(BooleanSupplier, long)
, wake it up.Methods inherited from class io.deephaven.engine.updategraph.impl.BaseUpdateGraph
addNotifications, buildOrThrow, clock, createUpdatePerformanceEntry, currentThreadProcessesUpdates, exclusiveLock, existingOrBuild, getName, getUpdateGraph, logDependencies, removeInstance, removeSource, removeSources, resetNextFlushTime, satisfied, serialTableOperationsSafe, setSerialTableOperationsSafe, sharedLock, sourceCount, supportsRefreshing, takeAccumulatedCycleStats, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface io.deephaven.engine.updategraph.NotificationQueue.Dependency
getUpdateGraph
Methods inherited from interface io.deephaven.engine.updategraph.UpdateGraph
allowSerialTableOperations, allowSerialTableOperations, cast, checkInitiateSerialTableOperation, requestSignal, runWhenIdle
-
Field Details
-
NUM_THREADS_DEFAULT_UPDATE_GRAPH
public static final int NUM_THREADS_DEFAULT_UPDATE_GRAPH -
ALLOW_UNIT_TEST_MODE_PROP
- See Also:
-
DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP
- See Also:
-
-
Constructor Details
-
PeriodicUpdateGraph
public PeriodicUpdateGraph(String name, boolean allowUnitTestMode, long targetCycleDurationMillis, long minimumCycleDurationToLogNanos, int numUpdateThreads, ThreadInitializationFactory threadInitializationFactory, OperationInitializer operationInitializer)
-
-
Method Details
-
newBuilder
-
append
-
parallelismFactor
public int parallelismFactor()Retrieve the number of update threads.The PeriodicUpdateGraph has a configurable number of update processing threads. The number of threads is exposed in your method to enable you to partition a query based on the number of threads.
- Returns:
- the number of update threads configured.
-
setTargetCycleDurationMillis
public void setTargetCycleDurationMillis(long targetCycleDurationMillis) Set the target duration of an update cycle, including the updating phase and the idle phase. This is also the target interval between the start of one cycle and the start of the next.Can be reset to default via
resetTargetCycleDuration()
.- Parameters:
targetCycleDurationMillis
- The target duration for update cycles in milliseconds- ImplNote:
- Any target cycle duration
< 0
will be clamped to 0.
-
getTargetCycleDurationMillis
public long getTargetCycleDurationMillis()Get the target duration of an update cycle, including the updating phase and the idle phase. This is also the target interval between the start of one cycle and the start of the next.- Returns:
- The
current
target cycle duration
-
isCycleOnBudget
public boolean isCycleOnBudget(long cycleTimeNanos) Description copied from class:BaseUpdateGraph
Is the provided cycle time on budget?- Overrides:
isCycleOnBudget
in classBaseUpdateGraph
- Parameters:
cycleTimeNanos
- the cycle time, in nanoseconds- Returns:
- true if the cycle time is within the desired budget
-
resetTargetCycleDuration
public void resetTargetCycleDuration()Resets the run cycle time to the default target configured via thePeriodicUpdateGraph.Builder
setting.- ImplNote:
- If the
PeriodicUpdateGraph.Builder.targetCycleDurationMillis(long)
property is not set, this value defaults toDEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP
which defaults to 1000ms.
-
enableUnitTestMode
public void enableUnitTestMode()Enable unit test mode.
In this mode calls to
addSource(Runnable)
will only mark tables asrefreshing
. Additionallystart()
may not be called. -
isUnitTestModeAllowed
public boolean isUnitTestModeAllowed()- Returns:
- whether unit test mode is allowed
-
setWatchDogMillis
public void setWatchDogMillis(int watchDogMillis) Enable the loop watchdog with the specified timeout. A value of 0 disables the watchdog.- Parameters:
watchDogMillis
- The time in milliseconds to set the watchdog, or 0 to disable.- ImplNote:
- Any timeout less than 0 will be clamped to 0.
-
getWatchDogMillis
public int getWatchDogMillis()Get the current watchdogtimeout
value.- Returns:
- The current timeout for the watchdog, 0 for disabled
-
setWatchDogTimeoutProcedure
Set the procedure to be called when the watchdogtimes out
.- Parameters:
procedure
- The procedure to call
-
start
public void start()Install a real NotificationProcessor and start the primary refresh thread.- ImplNote:
- Must not be in
unit test
mode.
-
stop
public void stop()Begins the process to stop all processing threads and forces ReferenceCounted sources to a reference count of zero. -
addSource
Add a table to the list of tables to run and mark it asrefreshing
if it was aDynamicNode
.- Specified by:
addSource
in interfaceUpdateSourceRegistrar
- Overrides:
addSource
in classBaseUpdateGraph
- Parameters:
updateSource
- The table to be added to the run list- ImplNote:
- This will do nothing in
unit test
mode other than mark the table as refreshing.
-
addNotification
Enqueue a notification to be flushed according to its priority. Non-terminal notifications should only be enqueued during the updating phase of a cycle. That is, they should be enqueued from an update source or subsequent notification delivery.- Specified by:
addNotification
in interfaceNotificationQueue
- Overrides:
addNotification
in classBaseUpdateGraph
- Parameters:
notification
- The notification to enqueue- See Also:
-
maybeAddNotification
public boolean maybeAddNotification(@NotNull @NotNull NotificationQueue.Notification notification, long deliveryStep) Description copied from interface:NotificationQueue
Add a notification for this NotificationQueue to deliver (by invoking its run() method), iff the delivery step is the current step and the update cycle for that step is still in process. This is only supported for non-terminal notifications.- Specified by:
maybeAddNotification
in interfaceNotificationQueue
- Overrides:
maybeAddNotification
in classBaseUpdateGraph
- Parameters:
notification
- The notification to adddeliveryStep
- The step to deliver this notification on
-
requestRefresh
public void requestRefresh()Request that the next update cycle begin as soon as practicable. This "hurry-up" cycle happens through normal means using the refresh thread and its workers. -
resetForUnitTests
Clear all monitored tables and enqueued notifications to supportunit-tests
.- Parameters:
after
- Whether this is *after* a unit test completed. If true, held locks should result in an exception and the LivenessScopeStack will be cleared.
-
resetForUnitTests
@TestUseOnly public void resetForUnitTests(boolean after, boolean randomizedNotifications, int seed, int maxRandomizedThreadCount, int notificationStartDelay, int notificationAdditionDelay) Clear all monitored tables and enqueued notifications to supportunit-tests
.- Parameters:
after
- Whether this is *after* a unit test completed. If true, held locks should result in an exception and the LivenessScopeStack will be cleared.randomizedNotifications
- Whether the notification processor should randomize the order of deliveryseed
- Seed for randomized notification delivery order and delaysmaxRandomizedThreadCount
- Maximum number of threads handling randomized notification deliverynotificationStartDelay
- Maximum randomized notification start delaynotificationAdditionDelay
- Maximum randomized notification addition delay
-
startCycleForUnitTests
Begin the nextupdate cycle
while inunit-test
mode. Note that this happens on a simulated UpdateGraph run thread, rather than this thread. This overload is the same asstartCycleForUnitTests(true)
. -
startCycleForUnitTests
Begin the nextupdate cycle
while inunit-test
mode. Note that this happens on a simulated UpdateGraph run thread, rather than this thread.- Parameters:
sourcesSatisfied
- Whether sources should be marked as satisfied by this invocation; iffalse
, the caller must control source satisfaction usingmarkSourcesRefreshedForUnitTests()
.
-
markSourcesRefreshedForUnitTests
Record that sources have been satisfied within a unit test cycle. -
completeCycleForUnitTests
Do the second half of the update cycle, including flushing notifications, and completing theLogicalClock
update cycle. Note that this happens on a simulated UpdateGraph run thread, rather than this thread. -
runWithinUnitTestCycle
@TestUseOnly public <T extends Exception> void runWithinUnitTestCycle(@NotNull @NotNull ThrowingRunnable<T> runnable) throws T Execute the given runnable wrapped withstartCycleForUnitTests()
andcompleteCycleForUnitTests()
. Note that the runnable is run on the current thread. This is equivalent torunWithinUnitTestCycle(runnable, true)
.- Parameters:
runnable
- The runnable to execute- Throws:
T extends Exception
-
runWithinUnitTestCycle
@TestUseOnly public <T extends Exception> void runWithinUnitTestCycle(@NotNull @NotNull ThrowingRunnable<T> runnable, boolean sourcesSatisfied) throws T Execute the given runnable wrapped withstartCycleForUnitTests()
andcompleteCycleForUnitTests()
. Note that the runnable is run on the current thread.- Parameters:
runnable
- The runnable to executesourcesSatisfied
- Whether sources should be marked as satisfied by this invocation; iffalse
, the caller must control source satisfaction usingmarkSourcesRefreshedForUnitTests()
.- Throws:
T extends Exception
-
refreshUpdateSourceForUnitTests
Refresh an update source on a simulated UpdateGraph run thread, rather than this thread.- Parameters:
updateSource
- The update source to run
-
flushOneNotificationForUnitTests
Flush a single notification from the UpdateGraph queue. Note that this happens on a simulated UpdateGraph run thread, rather than this thread.- Returns:
- whether a notification was found in the queue
-
flushOneNotificationForUnitTests
@TestUseOnly public boolean flushOneNotificationForUnitTests(boolean expectOnlyUnsatisfiedNotifications) Flush a single notification from the UpdateGraph queue. Note that this happens on a simulated UpdateGraph run thread, rather than this thread.- Parameters:
expectOnlyUnsatisfiedNotifications
- Whether we expect there to be only unsatisfied notifications pending- Returns:
- whether a notification was found in the queue
-
flushAllNormalNotificationsForUnitTests
Flush all the normal notifications from the UpdateGraph queue. Note that the flushing happens on a simulated UpdateGraph run thread, rather than this thread. -
flushAllNormalNotificationsForUnitTests
@TestUseOnly public Runnable flushAllNormalNotificationsForUnitTests(@NotNull @NotNull BooleanSupplier done, long timeoutMillis) Flush all the normal notifications from the UpdateGraph queue, continuing untildone
returnstrue
. Note that the flushing happens on a simulated UpdateGraph run thread, rather than this thread.- Parameters:
done
- Function to determine when we can stop waiting for new notifications- Returns:
- A Runnable that may be used to wait for the concurrent flush job to complete
-
wakeRefreshThreadForUnitTests
If the run thread is waiting in flushNormalNotificationsAndCompleteCycle() orflushAllNormalNotificationsForUnitTests(BooleanSupplier, long)
, wake it up. -
getInstance
-