Enum UpdateGraphProcessor
- All Implemented Interfaces:
LogOutputAppendable
,NotificationQueue
,NotificationQueue.Dependency
,UpdateSourceRegistrar
,Serializable
,Comparable<UpdateGraphProcessor>
,java.lang.constant.Constable
public enum UpdateGraphProcessor extends Enum<UpdateGraphProcessor> implements UpdateSourceRegistrar, NotificationQueue, NotificationQueue.Dependency
This class uses a thread (or pool of threads) to periodically update a set of monitored update sources at a specified
target cycle interval. The target cycle interval can be configured
to
reduce or increase the run rate of the monitored sources.
This class can be configured via the following Configuration
property
- "UpdateGraphProcessor.targetCycleDurationMillis"(optional) - The default target cycle time in ms (1000 if not defined)
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
UpdateGraphProcessor.AccumulatedCycleStats
Nested classes/interfaces inherited from class java.lang.Enum
Enum.EnumDesc<E extends Enum<E>>
Nested classes/interfaces inherited from interface io.deephaven.engine.updategraph.NotificationQueue
NotificationQueue.Dependency, NotificationQueue.ErrorNotification, NotificationQueue.Notification
-
Enum Constant Summary
Enum Constants Enum Constant Description DEFAULT
-
Field Summary
Fields Modifier and Type Field Description UpdateGraphProcessor.AccumulatedCycleStats
accumulatedCycleStats
static String
ALLOW_UNIT_TEST_MODE_PROP
-
Method Summary
Modifier and Type Method Description void
addNotification(NotificationQueue.Notification notification)
Enqueue a notification to be flushed according to its priority.void
addNotifications(Collection<NotificationQueue.Notification> notifications)
Enqueue a collection of notifications to be flushed.void
addSource(Runnable updateSource)
Add a table to the list of tables to run and mark it asrefreshing
if it was aDynamicNode
.LogOutput
append(LogOutput logOutput)
void
checkInitiateTableOperation()
If we are establishing a new table operation, on a refreshing table without the UpdateGraphProcessor lock; then we are likely committing a grievous error, but one that will only occasionally result in us getting the wrong answer or if we are lucky an assertion.void
completeCycleForUnitTests()
Do the second half of the update cycle, including flushing notifications, and completing theLogicalClock
update cycle.void
doUnchecked(Runnable runnable)
Execute the supplied code while table operations are unchecked.<T> T
doUnchecked(Supplier<T> supplier)
Execute the supplied code while table operations are unchecked.void
enableUnitTestMode()
Enable unit test mode.AwareFunctionalLock
exclusiveLock()
Get the exclusive lock for thisUpdateGraphProcessor
.void
flushAllNormalNotificationsForUnitTests()
Flush all the normal notifications from the UGP queue.Runnable
flushAllNormalNotificationsForUnitTests(BooleanSupplier done, long timeoutMillis)
Flush all the normal notifications from the UGP queue, continuing untildone
returnstrue
.boolean
flushOneNotificationForUnitTests()
Flush a single notification from the UGP queue.boolean
flushOneNotificationForUnitTestsInternal()
boolean
getCheckTableOperations()
Should this thread check table operations for safety with respect to the update lock?long
getTargetCycleDurationMillis()
Get the target duration of an update cycle, including the updating phase and the idle phase.int
getUpdateThreads()
Retrieve the number of update threads.int
getWatchDogMillis()
Get the current watchdogtimeout
value.boolean
isRefreshThread()
Test if this thread is part of our run thread executor service.LogEntry
logDependencies()
boolean
maybeAddNotification(NotificationQueue.Notification notification, long deliveryStep)
Add a notification for this NotificationQueue to deliver (by invoking its run() method), iff the delivery step is the current step and the update cycle for that step is still in process.void
refreshUpdateSourceForUnitTests(Runnable updateSource)
Refresh an update source on a simulated UGP run thread, rather than this thread.void
removeSource(Runnable updateSource)
Remove a source from this registrar.void
removeSources(Collection<Runnable> sourcesToRemove)
Remove a collection of sources from the list of refreshing sources.void
requestRefresh()
Request that the next update cycle begin as soon as practicable.void
requestSignal(Condition updateGraphProcessorCondition)
void
resetCycleDuration()
Resets the run cycle time to the default target configured via the "UpdateGraphProcessor.targetCycleDurationMillis" property.void
resetForUnitTests(boolean after)
Clear all monitored tables and enqueued notifications to supportunit-tests
.void
resetForUnitTests(boolean after, boolean randomizedNotifications, int seed, int maxRandomizedThreadCount, int notificationStartDelay, int notificationAdditionDelay)
Clear all monitored tables and enqueued notifications to supportunit-tests
.<T extends Exception>
voidrunWithinUnitTestCycle(FunctionalInterfaces.ThrowingRunnable<T> runnable)
Execute the given runnable wrapped withstartCycleForUnitTests()
andcompleteCycleForUnitTests()
.boolean
satisfied(long step)
Is this ancestor satisfied? Note that this method must be safe to call on any thread.boolean
setCheckTableOperations(boolean value)
If you know that the table operations you are performing are indeed safe, then call this method with false to disable table operation checking.void
setTargetCycleDurationMillis(long targetCycleDurationMillis)
Set the target duration of an update cycle, including the updating phase and the idle phase.void
setWatchDogMillis(int watchDogMillis)
Enable the loop watchdog with the specified timeout.void
setWatchDogTimeoutProcedure(LongConsumer procedure)
Set the procedure to be called when the watchdogtimes out
.AwareFunctionalLock
sharedLock()
Get the shared lock for thisUpdateGraphProcessor
.int
sourceCount()
Return the number of valid sources.void
start()
Start the table run thread.void
startCycleForUnitTests()
Begin the nextupdate cycle
while inunit-test
mode.String
toString()
static UpdateGraphProcessor
valueOf(String name)
Returns the enum constant of this type with the specified name.static UpdateGraphProcessor[]
values()
Returns an array containing the constants of this enum type, in the order they are declared.void
wakeRefreshThreadForUnitTests()
If the run thread is waiting influshNormalNotificationsAndCompleteCycle()
orflushAllNormalNotificationsForUnitTests(BooleanSupplier, long)
, wake it up.
-
Enum Constant Details
-
DEFAULT
-
-
Field Details
-
ALLOW_UNIT_TEST_MODE_PROP
- See Also:
- Constant Field Values
-
accumulatedCycleStats
-
-
Method Details
-
values
Returns an array containing the constants of this enum type, in the order they are declared.- Returns:
- an array containing the constants of this enum type, in the order they are declared
-
valueOf
Returns the enum constant of this type with the specified name. The string must match exactly an identifier used to declare an enum constant in this type. (Extraneous whitespace characters are not permitted.)- Parameters:
name
- the name of the enum constant to be returned.- Returns:
- the enum constant with the specified name
- Throws:
IllegalArgumentException
- if this enum type has no constant with the specified nameNullPointerException
- if the argument is null
-
append
- Specified by:
append
in interfaceLogOutputAppendable
-
toString
- Overrides:
toString
in classEnum<UpdateGraphProcessor>
-
getUpdateThreads
public int getUpdateThreads()Retrieve the number of update threads.The UpdateGraphProcessor has a configurable number of update processing threads. The number of threads is exposed in your method to enable you to partition a query based on the number of threads.
- Returns:
- the number of update threads configured.
-
exclusiveLock
Get the exclusive lock for this
UpdateGraphProcessor
.Using this lock will prevent run or read-only processing from proceeding concurrently.
The exclusive lock implementation is expected to support reentrance.
Note that using the exclusive lock while the shared lock is held by the current thread will result in exceptions, as lock upgrade is not supported.
This lock does support
Lock.newCondition()
.- Returns:
- The exclusive lock for this
UpdateGraphProcessor
-
isRefreshThread
public boolean isRefreshThread()Test if this thread is part of our run thread executor service.- Returns:
- whether this is one of our run threads.
-
checkInitiateTableOperation
public void checkInitiateTableOperation()If we are establishing a new table operation, on a refreshing table without the UpdateGraphProcessor lock; then we are likely committing a grievous error, but one that will only occasionally result in us getting the wrong answer or if we are lucky an assertion. This method is called from various query operations that should not be established without the UGP lock.
The run thread pool threads are allowed to instantiate operations, even though that thread does not have the lock; because they are protected by the main run thread and dependency tracking.
If you are sure that you know what you are doing better than the query engine, you may call
setCheckTableOperations(boolean)
to set a thread local variable bypassing this check. -
setCheckTableOperations
public boolean setCheckTableOperations(boolean value)If you know that the table operations you are performing are indeed safe, then call this method with false to disable table operation checking. Conversely, if you want to enforce checking even if the configuration disagrees; call it with true.- Parameters:
value
- the new value of check table operations- Returns:
- the old value of check table operations
-
doUnchecked
Execute the supplied code while table operations are unchecked.- Parameters:
supplier
- the function to run- Returns:
- the result of supplier
-
doUnchecked
Execute the supplied code while table operations are unchecked.- Parameters:
runnable
- the function to run
-
getCheckTableOperations
public boolean getCheckTableOperations()Should this thread check table operations for safety with respect to the update lock?- Returns:
- if we should check table operations.
-
setTargetCycleDurationMillis
public void setTargetCycleDurationMillis(long targetCycleDurationMillis)Set the target duration of an update cycle, including the updating phase and the idle phase. This is also the target interval between the start of one cycle and the start of the next.
Can be reset to default via
resetCycleDuration()
.- Parameters:
targetCycleDurationMillis
- The target duration for update cycles in milliseconds- ImplNote:
- Any target cycle duration
< 0
will be clamped to 0.
-
getTargetCycleDurationMillis
public long getTargetCycleDurationMillis()Get the target duration of an update cycle, including the updating phase and the idle phase. This is also the target interval between the start of one cycle and the start of the next.- Returns:
- The
current
target cycle duration
-
resetCycleDuration
public void resetCycleDuration()Resets the run cycle time to the default target configured via the "UpdateGraphProcessor.targetCycleDurationMillis" property.- ImplNote:
- If the "UpdateGraphProcessor.targetCycleDurationMillis" property is not set, this value defaults to 1000ms.
-
enableUnitTestMode
public void enableUnitTestMode()Enable unit test mode.
In this mode calls to
addSource(Runnable)
will only mark tables asrefreshing
. Additionallystart()
may not be called. -
setWatchDogMillis
public void setWatchDogMillis(int watchDogMillis)Enable the loop watchdog with the specified timeout. A value of 0 disables the watchdog.- Parameters:
watchDogMillis
- The time in milliseconds to set the watchdog, or 0 to disable.- ImplNote:
- Any timeout less than 0 will be clamped to 0.
-
getWatchDogMillis
public int getWatchDogMillis()Get the current watchdogtimeout
value.- Returns:
- The current timeout for the watchdog, 0 for disabled
-
setWatchDogTimeoutProcedure
Set the procedure to be called when the watchdogtimes out
.- Parameters:
procedure
- The procedure to call
-
requestSignal
-
start
public void start()Start the table run thread.- ImplNote:
- Must not be in
unit test
mode.
-
addSource
Add a table to the list of tables to run and mark it asrefreshing
if it was aDynamicNode
.- Specified by:
addSource
in interfaceUpdateSourceRegistrar
- Parameters:
updateSource
- The table to be added to the run list- ImplNote:
- This will do nothing in
unit test
mode other than mark the table as refreshing.
-
removeSource
Description copied from interface:UpdateSourceRegistrar
Remove a source from this registrar.- Specified by:
removeSource
in interfaceUpdateSourceRegistrar
- Parameters:
updateSource
- The table to remove
-
removeSources
Remove a collection of sources from the list of refreshing sources.- Parameters:
sourcesToRemove
- The sources to remove from the list of refreshing sources- ImplNote:
- This will not set the sources as
non-refreshing
.
-
sourceCount
public int sourceCount()Return the number of valid sources.- Returns:
- the number of valid sources
-
addNotification
Enqueue a notification to be flushed according to its priority. Non-terminal notifications should only be enqueued during the updating phase of a cycle. That is, they should be enqueued from an update source or subsequent notification delivery.- Specified by:
addNotification
in interfaceNotificationQueue
- Parameters:
notification
- The notification to enqueue- See Also:
NotificationQueue.Notification.isTerminal()
,LogicalClock.State
-
maybeAddNotification
public boolean maybeAddNotification(@NotNull NotificationQueue.Notification notification, long deliveryStep)Description copied from interface:NotificationQueue
Add a notification for this NotificationQueue to deliver (by invoking its run() method), iff the delivery step is the current step and the update cycle for that step is still in process. This is only supported for non-terminal notifications.- Specified by:
maybeAddNotification
in interfaceNotificationQueue
- Parameters:
notification
- The notification to adddeliveryStep
- The step to deliver this notification on
-
satisfied
public boolean satisfied(long step)Description copied from interface:NotificationQueue.Dependency
Is this ancestor satisfied? Note that this method must be safe to call on any thread.- Specified by:
satisfied
in interfaceNotificationQueue.Dependency
- Parameters:
step
- The step for which we are testing satisfaction- Returns:
- Whether the dependency is satisfied on
step
(and will not fire subsequent notifications)
-
addNotifications
Enqueue a collection of notifications to be flushed.- Parameters:
notifications
- The notification to enqueue- See Also:
addNotification(Notification)
-
requestRefresh
public void requestRefresh()Request that the next update cycle begin as soon as practicable. This "hurry-up" cycle happens through normal means using the refresh thread and its workers.- Specified by:
requestRefresh
in interfaceUpdateSourceRegistrar
-
resetForUnitTests
Clear all monitored tables and enqueued notifications to supportunit-tests
.- Parameters:
after
- Whether this is *after* a unit test completed. If true, held locks should result in an exception and the LivenessScopeStack will be cleared.
-
resetForUnitTests
public void resetForUnitTests(boolean after, boolean randomizedNotifications, int seed, int maxRandomizedThreadCount, int notificationStartDelay, int notificationAdditionDelay)Clear all monitored tables and enqueued notifications to supportunit-tests
.- Parameters:
after
- Whether this is *after* a unit test completed. If true, held locks should result in an exception and the LivenessScopeStack will be cleared.randomizedNotifications
- Whether the notification processor should randomize the order of deliveryseed
- Seed for randomized notification delivery order and delaysmaxRandomizedThreadCount
- Maximum number of threads handling randomized notification deliverynotificationStartDelay
- Maximum randomized notification start delaynotificationAdditionDelay
- Maximum randomized notification addition delay
-
startCycleForUnitTests
Begin the nextupdate cycle
while inunit-test
mode. Note that this happens on a simulated UGP run thread, rather than this thread. -
completeCycleForUnitTests
Do the second half of the update cycle, including flushing notifications, and completing theLogicalClock
update cycle. Note that this happens on a simulated UGP run thread, rather than this thread. -
runWithinUnitTestCycle
@TestUseOnly public <T extends Exception> void runWithinUnitTestCycle(FunctionalInterfaces.ThrowingRunnable<T> runnable) throws T extends ExceptionExecute the given runnable wrapped withstartCycleForUnitTests()
andcompleteCycleForUnitTests()
. Note that the runnable is run on the current thread.- Parameters:
runnable
- the runnable to execute.- Throws:
T extends Exception
-
refreshUpdateSourceForUnitTests
Refresh an update source on a simulated UGP run thread, rather than this thread.- Parameters:
updateSource
- The update source to run
-
flushOneNotificationForUnitTests
Flush a single notification from the UGP queue. Note that this happens on a simulated UGP run thread, rather than this thread.- Returns:
- whether a notification was found in the queue
-
flushOneNotificationForUnitTestsInternal
-
flushAllNormalNotificationsForUnitTests
Flush all the normal notifications from the UGP queue. Note that the flushing happens on a simulated UGP run thread, rather than this thread. -
flushAllNormalNotificationsForUnitTests
@TestUseOnly public Runnable flushAllNormalNotificationsForUnitTests(@NotNull BooleanSupplier done, long timeoutMillis)Flush all the normal notifications from the UGP queue, continuing untildone
returnstrue
. Note that the flushing happens on a simulated UGP run thread, rather than this thread.- Parameters:
done
- Function to determine when we can stop waiting for new notifications- Returns:
- A Runnable that may be used to wait for the concurrent flush job to complete
-
wakeRefreshThreadForUnitTests
If the run thread is waiting influshNormalNotificationsAndCompleteCycle()
orflushAllNormalNotificationsForUnitTests(BooleanSupplier, long)
, wake it up. -
logDependencies
-