Skip to main content
Version: Python

Multithreading: Synchronization, locks, and snapshots

Deephaven is often run on multiprocessor systems. Synchronized access to changing data from multiple threads is implemented in the API with a shared lock for reading data, an exclusive lock for writing data and propagating changes, and an optimistic snapshotting mechanism to avoid locking while retrieving data and running table operations.

note

While the synchronization primitives presented in this guide are universal to Deephaven, the examples below use Deephaven's Java API instead of Python.

Query engine locks

Synchronization of access to Deephaven data is managed by the Update Graph's shared lock and exclusive lock.

Locks should be acquired only when necessary and held for the shortest duration possible, as the Update Graph Processor cannot start an update cycle until the locks have been released by all non-UG threads.

As of Deephaven version 0.17.0, the shared and exclusive locks are backed by a ReentrantReadWriteLock, though this is subject to change.

Shared lock

The shared lock is typically held while running table operations (.where(), .naturalJoin(), etc.) from independent threads, outside the Deephaven console and app mode startup scripts. It can also be used when reading copious amounts of data from tables, but it is usually preferable to use snapshotting in these cases to avoid the need to acquire the shared lock.

Any threads waiting for shared lock while the Periodic Update Graph is running will acquire it once the UG completes its cycle and releases the exclusive lock. Any thread can immediately acquire the shared lock while the Update Graph Processor is not running. However, the Periodic Update Graph will not be able to start a new refresh cycle while any thread holds the shared lock.

Exclusive lock

The exclusive lock is held by the query engine while processing updates. Only one thread at a time can hold the exclusive lock, and no threads will hold the shared lock while the exclusive lock is locked. The exclusive lock is also always held automatically when running commands in the Deephaven console or executing an app mode startup script.

Waiting for changes from the Periodic Update Graph

Users do not typically interact with this lock directly, but in certain cases it is used when waiting for changes from the Periodic Update Graph.

Most commonly, when waiting for table updates (using awaitUpdate()), the exclusive lock is acquired automatically. This is required if a Periodic Update Graph cycle must occur in the middle of another operation — for example, if data is flushed to a DynamicTableWriter in the middle of query initialization, it will not be reflected in the output table until the Periodic Update Graph runs a refresh cycle.

note

Upgrading from the shared lock to the exclusive lock is not supported and will throw an exception. Accordingly, awaitUpdate() cannot be used while shared lock is held.

In advanced cases where awaitUpdate() is insufficient, the exclusive lock can also be held in order to wait on conditions from the UG. This is helpful in situations where an independent (non-UG) thread must wait for a change or notification propagated by the Periodic Update Graph, often in conjunction with a custom listener. In this situation, a condition can be obtained using ExecutionContext.getContext().getUpdateGraph().exclusiveLock().newCondition(), the independent thread can await() the condition, and the can condition can be signaled by calling requestSignal() from the Periodic Update Graph's refresh thread. A simple example of this pattern that can be run in Groovy is provided below:

import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter;
import io.deephaven.engine.context.ExecutionContext;

import java.util.concurrent.locks.Condition;

final Condition myCondition = ExecutionContext.getContext().getUpdateGraph().exclusiveLock().newCondition();

Thread independentThread = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " - " + currentTime() + " - Acquiring update graph exclusive lock...");
ExecutionContext.getContext().getUpdateGraph().exclusiveLock().doLocked(() -> {
System.out.println(Thread.currentThread().getName() + " - " + currentTime() + " - Waiting for condition...");
myCondition.await();
System.out.println(Thread.currentThread().getName() + " - " + currentTime() + " - Condition was signaled!");
});
System.out.println(Thread.currentThread().getName() + " - " + currentTime() + " - update graph exclusive lock released.")

}, "myIndependentThread");
independentThread.start();

// Create a time table that ticks every 5 seconds
myTable = timeTable("00:00:05");

myListener = new InstrumentedTableUpdateListenerAdapter(myTable, false) {
@Override
public void onUpdate(TableUpdate upstream) {
// Request that the UpdateGraph signal the condition, so that anything
// await()ing the condition is notified.
System.out.println(Thread.currentThread().getName() + " - " + currentTime() + " - Signaling condition...")
ExecutionContext.getContext().getUpdateGraph().requestSignal(myCondition);
System.out.println(Thread.currentThread().getName() + " - " + currentTime() + " - Condition signaled.")
}
};

myTable.addUpdateListener(myListener);

The output to demonstrates the progress of the two threads. First, myIndependentThread acquires the lock and waits for the condition. Shortly afterward, the Periodic Update Graph processes a new timeTable tick on its five-second interval — triggering the listener's onUpdate() method, which signals the condition. After the condition has been signaled and the Periodic Update Graph finishes its cycle, myIndependentThread returns from myCondition.await() and resumes execution. The Periodic Update Graph will continue to run the listener until the listener is either removed from the table or goes out of scope and is removed by JVM garbage collection.

myIndependentThread - 2022-10-12T12:36:06.890861000 NY - Acquiring update graph exclusive lock...
myIndependentThread - 2022-10-12T12:36:06.998700000 NY - Waiting for condition...
PeriodicUpdateGraph.DEFAULT.refreshThread - 2022-10-12T12:36:07.142583000 NY - Signaling condition...
PeriodicUpdateGraph.DEFAULT.refreshThread - 2022-10-12T12:36:07.150394000 NY - Condition signaled.
myIndependentThread - 2022-10-12T12:36:07.153525000 NY - Condition was signaled!
myIndependentThread - 2022-10-12T12:36:07.154153000 NY - Update graph exclusive lock released.
PeriodicUpdateGraph.DEFAULT.refreshThread - 2022-10-12T12:36:10.117031000 NY - Signaling condition...
PeriodicUpdateGraph.DEFAULT.refreshThread - 2022-10-12T12:36:10.117912000 NY - Condition signaled.
PeriodicUpdateGraph.DEFAULT.refreshThread - 2022-10-12T12:36:15.115583000 NY - Signaling condition...
PeriodicUpdateGraph.DEFAULT.refreshThread - 2022-10-12T12:36:15.116385000 NY - Condition signaled.

Using locks

The preferred way to use either lock is as a functional lock, where the operation to execute (the SnapshotFunction) is passed as an argument (typically a lambda expression). This simplifies the standard try/finally locking pattern (demonstrated under Explicit locking and unlocking)

The doLocked() method allows for arbitrary code to be executed while holding the lock:

Table myTable = getMyTable();
ExecutionContext.getContext().getUpdateGraph().sharedLock().doLocked( () -> {
// Retrieve the data from "MyCol" as an array and print it:
String[] myColAsArray = myTable.getColumn("MyCol").getDirect()
System.out.println(Arrays.toString(col1array));
});

It is also possible to compute a value while holding the lock and return it directly to the caller, using computeLocked():

Table myTable = getMyTable();
String[] myColAsArray = ExecutionContext.getContext().getUpdateGraph().sharedLock().computeLocked( () -> {
// Retrieve the data from "MyCol" as an array and return it:
return myTable.getColumn("MyCol").getDirect();
});

Both doLocked() and computeLocked() allow the supplier to throw checked exceptions (e.g. IOException, TimeoutException, or anything else that is not a subclass of RuntimeException). In these cases, the caller must handle the potential exception.

try {
ExecutionContext.getContext().getUpdateGraph().sharedLock().computeLocked(() -> { throw new IOException(); });
} catch (IOException e) {
throw new UncheckedIOException("IOException occurred during computeLocked()", e);
}
Explicit locking and unlocking

Although it is not recommended, it is also possible to acquire and release a lock explicitly using the lock() and unlock() methods. When doing so, you must take care to ensure that the lock is always released in a finally block, so that it is guaranteed to be released even when exceptions or errors occur. (The doLocked() and computeLocked() methods handle this automatically.)

Below is an example of the correct way to use the shared lock with explicit locking.

// Acquire the shared lock
ExecutionContext.getContext().getUpdateGraph().sharedLock().lock();
try {
// code to run under the lock
} finally {
ExecutionContext.getContext().getUpdateGraph().sharedLock().unlock();
}

Snapshotting

A snapshot is a guaranteed-consistent result derived from data in Deephaven without necessarily acquiring a lock. Deephaven's snapshotting mechanism is a form of optimistic concurrency that helps to retrieve data safely, quickly, and without blocking the Periodic Update Graph.

Snapshots are performed using ConstructSnapshot.callDataSnapshotFunction(), which will attempt the operation without acquiring a lock, then verify that the Update Graph did not change any data while the operation was running. If data may have changed while the operation was running, the snapshotting mechanism will retry the operation.

If the snapshot cannot complete successfully after a limited number of attempts, or a limited duration of wall-clock time, the snapshotting mechanism will automatically acquire the shared lock in order to reliably perform the operation. The maximum number and duration of these attempts before acquiring the lock is controlled by the following properties.

PropertyDefault ValueDescription
ConstructSnapshot.maxConcurrentAttempts2The number of attempts at complete
ConstructSnapshot.maxConcurrentAttemptDurationMillis5000The maximum amount of time to try snapshotting

Obtaining snapshots

There are two steps to obtaining a snapshot of tables (or other data updated by the Periodic Update Graph):

  1. Create a SnapshotControl object to control snapshot validation.
  2. Run a SnapshotFunction that encapsulates the logic that must be run on consistent data.

Creating the SnapshotControl

To obtain snapshots, first use ConstructSnapshot.makeSnapshotControl() to create a SnapshotControl object, which tells the query engine how to verify that the snapshot result is correct. The makeSnapshotControl() parameters isNotificationAware, isRefreshing and sources control this verification process.

Parameter NameDescription
isNotificationAwareWhether the snapshot control. In most cases this should be false. If the snapshot is used to initialize a component that is subsequently updated in response to additional notifications (e.g. by a listener), then this should be true.
isRefreshingWhether the snapshot control needs to handle refreshing data. This must be true if any of the sources is refreshing.
sourcesA varargs parameter that should contain all sources (including tables) that will be read by the snapshot function.

Running a snapshot

To run a snapshot, pass a log prefix, the snapshot control object, and the snapshot function itself (typically a lambda) to callDataSnapshotFunction() to begin the snapshot process. The snapshot function may be called repeatedly, with or without the lock, depending on the configured limits on concurrent attempts. The usePrev parameter informs the snapshot function of whether it should use values from the current cycle or the previous cycle — if the UpdateGraph is running when the snapshot function is called, usePrev will be true; otherwise it is false.

Typically, a snapshot function will retrieve data from several ColumnSources for one or more index keys. Consider the example below, which retrieves data at index key 0 from two tables:

import io.deephaven.engine.table.impl.remote.ConstructSnapshot;

// Get tables to retrieve data from during the SnapshotFunction:
Table myTable = getMyTable();
Table myTable2 = getMyOtherTable();

// Obtain references to the desired ColumnSources from each table
ColumnSource<String> firstTableStrColSource = myTable.getColumnSource("MyStrCol", String.class);
ColumnSource<Integer> firstTableIntColSource = myTable.getColumnSource("MyIntCol", Integer.class);

ColumnSource<Double> secondTableDoubleColSource = myTable.getColumnSource("MyIntCol", Double.class);


// Create the SnapshotControl object:

// Most snapshots do not need to be 'notification aware'. A snapshot
// The SnapshotControl needs to be 'notification aware' if the result of the snapshot is used to
// build state that is intended to be updated in response to subsequent notifications (e.g. from a
// listener).
boolean isNotificationAware = false;

// Determine whether the SnapshotControl needs to handle refreshing data:
boolean isRefreshing = myTable.isRefreshing() || myTable2.isRefreshing();

// It is *essential* that all tables referenced by the snapshot function (e.g. myTable and myTable2)
// are included in the 'sources' parameter of `makeSnapshotControl()`.
ConstructSnapshot.SnapshotControl snapshotControl = ConstructSnapshot.makeSnapshotControl(
isNotificationAware,
isRefreshing,
myTable, myTable2
)

// Create a class to hold the results:
class DataHolder {
final String myStr;
final int myInt;
final double myDouble;

DataHolder(String myStr, int myInt, double myDouble) {
this.myStr = myStr;
this.myInt = myInt;
this.myDouble = myDouble;
}
}

// Create a reference in which the snapshot function can store its results:
AtomicReference<DataHolder> snapshotResult = new AtomicReference<>();
ConstructSnapshot.callDataSnapshotFunction("my_snapshot", snapshotControl, (boolean usePrev, long beforeClockValue) -> {
// For this example, we'll retrieve data from index key 0.
final long idxKey = 0L;

// Get the 'previous value' if usePrev is true. This occurs when the UpdateGraph is running to update
// the 'current value' with the latest data.
String myStringFromTable = usePrev ? firstTableStrColSource.getPrev(idxKey) : firstTableStrColSource.get(idxKey);
int myIntFromTable = usePrev ? firstTableIntColSource.getPrevInt(idxKey) : firstTableIntColSource.getInt(idxKey);
double myDoubleFromTable = usePrev ? secondTableDoubleColSource.getPrevDouble(idxKey) : secondTableIntColSource.getDouble(idxKey);

// Create the result value and store it in the reference.
// Since the snapshotResult reference is only used by one thread, it is better to use .setPlain() instead
// of .set() to avoid the small additional cost of a volatile write.
snapshotResult.setPlain(new DataHolder(myStringFromTable, myIntFromTable, myDoubleFromTable));

// return true, indicating that the snapshot has succeeded
return true;
});

// Since callDataSnapshotFunction() has returned and did not throw an exception,
// proceed with reading the snapshot result:
System.out.println("String from idx 0 of table: " + snapshotResult.getPlain());

If the snapshot function returns false, it is assumed that the data was inconsistent and the snapshot function is rerun. If the snapshot function completes by returning true or throwing an exception, the SnapshotControl will be used to validate that the data used by the snapshot is consistent — if not, the snapshot function is rerun. (The exceptions are only propagated by callDataSnapshotFunction() when validation succeeds, since it is expected that exceptions may occur when the data is inconsistent.) If the snapshot function throws an exception or returns false after callDataSnapshotFunction() has acquired the shared lock, then callDataSnapshotFunction() will throw an exception.

Early stopping of inconsistent snapshots

For snapshot functions comprising multiple operations, it is possible to check mid-operation whether the data has become inconsistent and determine early that the snapshot will not be reliable and should be aborted. This can be done by calling ConstructSnapshot.concurrentAttemptInconsistent() to verify that UpdateGraph's logical clock value has not changed since the snapshot function was called. If the clock has ticked, the snapshot function can return false to immediately abort and retry.

Consider the example below, in which data is retrieved from hundreds of columns.

// Get the source tables:
final Table myTable = getMyTable();
final Table myTable2 = getMyOtherTable();

// Retrieve column sources:
final ColumnSource<String> table1colSource1 = myTable.getColumnSource("Col1");
final ColumnSource<String> table1colSource2 = myTable.getColumnSource("Col2");
// . . .
final ColumnSource<String> table1colSource100 = myTable.getColumnSource("Col100");

final ColumnSource<String> table2colSource1 = myTable2.getColumnSource("Col1");
final ColumnSource<String> table2colSource2 = myTable2.getColumnSource("Col2");
// . . .
final ColumnSource<String> table2colSource100 = myTable2.getColumnSource("Col100");

// Create the snapshotControl:
ConstructSnapshot.SnapshotControl snapshotControl = ConstructSnapshot.makeSnapshotControl(
false,
true,
myTable, myTable2
);

// Create the snapshot result reference:
final String[] snapshotResults = new String[200];

// Run the snapshot:
ConstructSnapshot.callDataSnapshotFunction("my_snapshot", snapshotControl, (boolean usePrev, long beforeClockValue) -> {

snapshotResults[0] = usePrev ? table1colSource1.getPrev(idxKey) : table1colSource1.get(idxKey);
snapshotResults[1] = usePrev ? table1colSource2.getPrev(idxKey) : table1colSource2.get(idxKey);
// . . .
snapshotResults[99] = usePrev ? table1colSource100.getPrev(idxKey) : table1colSource100.get(idxKey);

// Verify that the snapshot is consistent so far before retrieving the next batch of data.
if (ConstructSnapshot.concurrentAttemptInconsistent()) {
// Return false if the snapshot is already inconsistent
return false;
}

snapshotResults[100] usePrev ? table2colSource1.getPrev(idxKey) : table2colSource1.get(idxKey);
snapshotResults[101] usePrev ? table2colSource2.getPrev(idxKey) : table2colSource2.get(idxKey);
// . . .
snapshotResults[199] = usePrev ? table2colSource100.getPrev(idxKey) : table2colSource100.get(idxKey);


// Return true; let the SnapshotControl verify the snapshot
return true;
})

Table snapshotResult = snapshotResultRef.getPlain();

Choosing between snapshots and locks

While the right choice among snapshots and locks depends heavily on the operation to be run and the context it is run from, the following guidelines can be helpful in making the choice:

  • When running code in the Deephaven console, it's rarely necessary to worry about any of these mechanisms — the exclusive lock is automatically held when running code in the console.
  • Use snapshots for filters and sorts in service of a user interface. (The Deephaven UI itself uses snapshotting to maximize responsiveness.) Snapshots can be processed even while the Update Graph is updating, and in some cases they return considerably more quickly than a lock could be acquired.
  • Use the shared lock when running table operations from a context that does not already hold a lock (e.g. arbitrary threads). Most table operations (beyond filters and sorts) cannot be initialized within a snapshot and must be run under a lock, and the shared lock is preferable to the exclusive lock because it allows multiple threads to run concurrently.
  • Use the exclusive lock in advanced cases that must wait on processing performed by the Periodic Update Graph's refresh thread and are not served by Table.awaitUpdate().