Class PeriodicUpdateGraph

java.lang.Object
io.deephaven.engine.updategraph.impl.BaseUpdateGraph
io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph
All Implemented Interfaces:
LogOutputAppendable, NotificationQueue, NotificationQueue.Dependency, UpdateGraph, UpdateSourceRegistrar

public class PeriodicUpdateGraph extends BaseUpdateGraph

This class uses a thread (or pool of threads) to periodically update a set of monitored update sources at a specified target cycle interval. The target cycle interval can be configured to reduce or increase the run rate of the monitored sources.

This class can be configured via the following Configuration property

  • Field Details

    • NUM_THREADS_DEFAULT_UPDATE_GRAPH

      public static final int NUM_THREADS_DEFAULT_UPDATE_GRAPH
    • ALLOW_UNIT_TEST_MODE_PROP

      public static final String ALLOW_UNIT_TEST_MODE_PROP
      See Also:
    • DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP

      public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP
      See Also:
  • Constructor Details

    • PeriodicUpdateGraph

      public PeriodicUpdateGraph(String name, boolean allowUnitTestMode, long targetCycleDurationMillis, long minimumCycleDurationToLogNanos, int numUpdateThreads, ThreadInitializationFactory threadInitializationFactory, OperationInitializer operationInitializer)
  • Method Details

    • newBuilder

      public static PeriodicUpdateGraph.Builder newBuilder(String name)
    • append

      public LogOutput append(@NotNull @NotNull LogOutput logOutput)
    • parallelismFactor

      public int parallelismFactor()
      Retrieve the number of update threads.

      The PeriodicUpdateGraph has a configurable number of update processing threads. The number of threads is exposed in your method to enable you to partition a query based on the number of threads.

      Returns:
      the number of update threads configured.
    • setTargetCycleDurationMillis

      public void setTargetCycleDurationMillis(long targetCycleDurationMillis)
      Set the target duration of an update cycle, including the updating phase and the idle phase. This is also the target interval between the start of one cycle and the start of the next.

      Can be reset to default via resetTargetCycleDuration().

      Parameters:
      targetCycleDurationMillis - The target duration for update cycles in milliseconds
      ImplNote:
      Any target cycle duration < 0 will be clamped to 0.
    • getTargetCycleDurationMillis

      public long getTargetCycleDurationMillis()
      Get the target duration of an update cycle, including the updating phase and the idle phase. This is also the target interval between the start of one cycle and the start of the next.
      Returns:
      The current target cycle duration
    • isCycleOnBudget

      public boolean isCycleOnBudget(long cycleTimeNanos)
      Description copied from class: BaseUpdateGraph
      Is the provided cycle time on budget?
      Overrides:
      isCycleOnBudget in class BaseUpdateGraph
      Parameters:
      cycleTimeNanos - the cycle time, in nanoseconds
      Returns:
      true if the cycle time is within the desired budget
    • resetTargetCycleDuration

      public void resetTargetCycleDuration()
      Resets the run cycle time to the default target configured via the PeriodicUpdateGraph.Builder setting.
      ImplNote:
      If the PeriodicUpdateGraph.Builder.targetCycleDurationMillis(long) property is not set, this value defaults to DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP which defaults to 1000ms.
    • enableUnitTestMode

      public void enableUnitTestMode()

      Enable unit test mode.

      In this mode calls to addSource(Runnable) will only mark tables as refreshing. Additionally start() may not be called.

    • isUnitTestModeAllowed

      public boolean isUnitTestModeAllowed()
      Returns:
      whether unit test mode is allowed
    • setWatchDogMillis

      public void setWatchDogMillis(int watchDogMillis)
      Enable the loop watchdog with the specified timeout. A value of 0 disables the watchdog.
      Parameters:
      watchDogMillis - The time in milliseconds to set the watchdog, or 0 to disable.
      ImplNote:
      Any timeout less than 0 will be clamped to 0.
    • getWatchDogMillis

      public int getWatchDogMillis()
      Get the current watchdog timeout value.
      Returns:
      The current timeout for the watchdog, 0 for disabled
    • setWatchDogTimeoutProcedure

      public void setWatchDogTimeoutProcedure(LongConsumer procedure)
      Set the procedure to be called when the watchdog times out.
      Parameters:
      procedure - The procedure to call
    • start

      public void start()
      Install a real NotificationProcessor and start the primary refresh thread.
      ImplNote:
      Must not be in unit test mode.
    • stop

      public void stop()
      Begins the process to stop all processing threads and forces ReferenceCounted sources to a reference count of zero.
    • addSource

      public void addSource(@NotNull @NotNull Runnable updateSource)
      Add a table to the list of tables to run and mark it as refreshing if it was a DynamicNode.
      Specified by:
      addSource in interface UpdateSourceRegistrar
      Overrides:
      addSource in class BaseUpdateGraph
      Parameters:
      updateSource - The table to be added to the run list
      ImplNote:
      This will do nothing in unit test mode other than mark the table as refreshing.
    • addNotification

      public void addNotification(@NotNull @NotNull NotificationQueue.Notification notification)
      Enqueue a notification to be flushed according to its priority. Non-terminal notifications should only be enqueued during the updating phase of a cycle. That is, they should be enqueued from an update source or subsequent notification delivery.
      Specified by:
      addNotification in interface NotificationQueue
      Overrides:
      addNotification in class BaseUpdateGraph
      Parameters:
      notification - The notification to enqueue
      See Also:
    • maybeAddNotification

      public boolean maybeAddNotification(@NotNull @NotNull NotificationQueue.Notification notification, long deliveryStep)
      Description copied from interface: NotificationQueue
      Add a notification for this NotificationQueue to deliver (by invoking its run() method), iff the delivery step is the current step and the update cycle for that step is still in process. This is only supported for non-terminal notifications.
      Specified by:
      maybeAddNotification in interface NotificationQueue
      Overrides:
      maybeAddNotification in class BaseUpdateGraph
      Parameters:
      notification - The notification to add
      deliveryStep - The step to deliver this notification on
    • requestRefresh

      public void requestRefresh()
      Request that the next update cycle begin as soon as practicable. This "hurry-up" cycle happens through normal means using the refresh thread and its workers.
    • resetForUnitTests

      @TestUseOnly public void resetForUnitTests(boolean after)
      Clear all monitored tables and enqueued notifications to support unit-tests.
      Parameters:
      after - Whether this is *after* a unit test completed. If true, held locks should result in an exception and the LivenessScopeStack will be cleared.
    • resetForUnitTests

      @TestUseOnly public void resetForUnitTests(boolean after, boolean randomizedNotifications, int seed, int maxRandomizedThreadCount, int notificationStartDelay, int notificationAdditionDelay)
      Clear all monitored tables and enqueued notifications to support unit-tests.
      Parameters:
      after - Whether this is *after* a unit test completed. If true, held locks should result in an exception and the LivenessScopeStack will be cleared.
      randomizedNotifications - Whether the notification processor should randomize the order of delivery
      seed - Seed for randomized notification delivery order and delays
      maxRandomizedThreadCount - Maximum number of threads handling randomized notification delivery
      notificationStartDelay - Maximum randomized notification start delay
      notificationAdditionDelay - Maximum randomized notification addition delay
    • startCycleForUnitTests

      @TestUseOnly public void startCycleForUnitTests()
      Begin the next update cycle while in unit-test mode. Note that this happens on a simulated UpdateGraph run thread, rather than this thread. This overload is the same as startCycleForUnitTests(true).
    • startCycleForUnitTests

      @TestUseOnly public void startCycleForUnitTests(boolean sourcesSatisfied)
      Begin the next update cycle while in unit-test mode. Note that this happens on a simulated UpdateGraph run thread, rather than this thread.
      Parameters:
      sourcesSatisfied - Whether sources should be marked as satisfied by this invocation; if false, the caller must control source satisfaction using markSourcesRefreshedForUnitTests().
    • markSourcesRefreshedForUnitTests

      @TestUseOnly public void markSourcesRefreshedForUnitTests()
      Record that sources have been satisfied within a unit test cycle.
    • completeCycleForUnitTests

      @TestUseOnly public void completeCycleForUnitTests()
      Do the second half of the update cycle, including flushing notifications, and completing the LogicalClock update cycle. Note that this happens on a simulated UpdateGraph run thread, rather than this thread.
    • runWithinUnitTestCycle

      @TestUseOnly public <T extends Exception> void runWithinUnitTestCycle(@NotNull @NotNull ThrowingRunnable<T> runnable) throws T
      Execute the given runnable wrapped with startCycleForUnitTests() and completeCycleForUnitTests(). Note that the runnable is run on the current thread. This is equivalent to runWithinUnitTestCycle(runnable, true).
      Parameters:
      runnable - The runnable to execute
      Throws:
      T extends Exception
    • runWithinUnitTestCycle

      @TestUseOnly public <T extends Exception> void runWithinUnitTestCycle(@NotNull @NotNull ThrowingRunnable<T> runnable, boolean sourcesSatisfied) throws T
      Execute the given runnable wrapped with startCycleForUnitTests() and completeCycleForUnitTests(). Note that the runnable is run on the current thread.
      Parameters:
      runnable - The runnable to execute
      sourcesSatisfied - Whether sources should be marked as satisfied by this invocation; if false, the caller must control source satisfaction using markSourcesRefreshedForUnitTests().
      Throws:
      T extends Exception
    • refreshUpdateSourceForUnitTests

      @TestUseOnly public void refreshUpdateSourceForUnitTests(@NotNull @NotNull Runnable updateSource)
      Refresh an update source on a simulated UpdateGraph run thread, rather than this thread.
      Parameters:
      updateSource - The update source to run
    • flushOneNotificationForUnitTests

      @TestUseOnly public boolean flushOneNotificationForUnitTests()
      Flush a single notification from the UpdateGraph queue. Note that this happens on a simulated UpdateGraph run thread, rather than this thread.
      Returns:
      whether a notification was found in the queue
    • flushOneNotificationForUnitTests

      @TestUseOnly public boolean flushOneNotificationForUnitTests(boolean expectOnlyUnsatisfiedNotifications)
      Flush a single notification from the UpdateGraph queue. Note that this happens on a simulated UpdateGraph run thread, rather than this thread.
      Parameters:
      expectOnlyUnsatisfiedNotifications - Whether we expect there to be only unsatisfied notifications pending
      Returns:
      whether a notification was found in the queue
    • flushAllNormalNotificationsForUnitTests

      @TestUseOnly public void flushAllNormalNotificationsForUnitTests()
      Flush all the normal notifications from the UpdateGraph queue. Note that the flushing happens on a simulated UpdateGraph run thread, rather than this thread.
    • flushAllNormalNotificationsForUnitTests

      @TestUseOnly public Runnable flushAllNormalNotificationsForUnitTests(@NotNull @NotNull BooleanSupplier done, long timeoutMillis)
      Flush all the normal notifications from the UpdateGraph queue, continuing until done returns true. Note that the flushing happens on a simulated UpdateGraph run thread, rather than this thread.
      Parameters:
      done - Function to determine when we can stop waiting for new notifications
      Returns:
      A Runnable that may be used to wait for the concurrent flush job to complete
    • wakeRefreshThreadForUnitTests

      @TestUseOnly public void wakeRefreshThreadForUnitTests()
      If the run thread is waiting in flushNormalNotificationsAndCompleteCycle() or flushAllNormalNotificationsForUnitTests(BooleanSupplier, long), wake it up.
    • getInstance

      public static PeriodicUpdateGraph getInstance(String name)