Source code for deephaven.ugp

#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
"""This module provides access to the Update Graph Processor(UGP)'s locks that must be acquired to perform certain
table operations. When working with refreshing tables, UGP locks must be held in order to have a consistent view of
the data between table operations.
"""

import contextlib
from collections import abc
from functools import wraps
from typing import Callable

import jpy

from deephaven import DHError

_JUpdateGraphProcessor = jpy.get_type("io.deephaven.engine.updategraph.UpdateGraphProcessor")
_j_exclusive_lock = _JUpdateGraphProcessor.DEFAULT.exclusiveLock()
_j_shared_lock = _JUpdateGraphProcessor.DEFAULT.sharedLock()

auto_locking = True
"""Whether to automatically acquire the Update Graph Processor(UGP) shared lock for an unsafe operation on a refreshing 
table when the current thread doesn't own either the UGP shared or the UGP exclusive lock. The newly obtained lock will 
be released after the table operation finishes. Auto locking is turned on by default."""


[docs]def has_exclusive_lock() -> bool: """Checks if the current thread is holding the Update Graph Processor(UGP) exclusive lock.""" return _j_exclusive_lock.isHeldByCurrentThread()
[docs]def has_shared_lock() -> bool: """Checks if the current thread is holding the Update Graph Processor(UGP) shared lock.""" return _j_shared_lock.isHeldByCurrentThread()
[docs]@contextlib.contextmanager def exclusive_lock(): """Context manager for running a block of code under a Update Graph Processor(UGP) exclusive lock.""" _j_exclusive_lock.lock() try: yield except Exception as e: raise DHError(e, "exception raised in the enclosed code block.") from e finally: _j_exclusive_lock.unlock()
[docs]@contextlib.contextmanager def shared_lock(): """Context manager for running a block of code under a Update Graph Processor(UGP) shared lock.""" _j_shared_lock.lock() try: yield except Exception as e: raise DHError(e, "exception raised in the enclosed code block.") from e finally: _j_shared_lock.unlock()
[docs]def exclusive_locked(f: Callable) -> Callable: """A decorator that ensures the decorated function be called under the Update Graph Processor(UGP) exclusive lock. The lock is released after the function returns regardless of what happens inside the function.""" @wraps(f) def do_locked(*arg, **kwargs): with exclusive_lock(): return f(*arg, **kwargs) return do_locked
[docs]def shared_locked(f: Callable) -> Callable: """A decorator that ensures the decorated function be called under the Update Graph Processor(UGP) shared lock. The lock is released after the function returns regardless of what happens inside the function.""" @wraps(f) def do_locked(*arg, **kwargs): with shared_lock(): return f(*arg, **kwargs) return do_locked
def _is_arg_refreshing(arg): if isinstance(arg, list) or isinstance(arg, tuple): for e in arg: if _is_arg_refreshing(e): return True elif getattr(arg, "is_refreshing", False): return True return False def _has_refreshing_tables(*args, **kwargs): for arg in args: if _is_arg_refreshing(arg): return True for k, v in kwargs.items(): if _is_arg_refreshing(v): return True return False
[docs]def auto_locking_op(f: Callable) -> Callable: """A decorator for annotating unsafe Table operations. It ensures that the decorated function runs under the UGP shared lock if ugp.auto_locking is True, the target table-like object or any table-like arguments are refreshing, and the current thread doesn't own any UGP locks.""" @wraps(f) def do_locked(*args, **kwargs): if (not _has_refreshing_tables(*args, **kwargs) or not auto_locking or has_shared_lock() or has_exclusive_lock()): return f(*args, **kwargs) with shared_lock(): return f(*args, **kwargs) return do_locked
[docs]@contextlib.contextmanager def auto_locking_ctx(*args, **kwargs): """An auto-locking aware context manager. It ensures that the enclosed code block runs under the UGP shared lock if ugp.auto_locking is True, the target table-like object or any table-like arguments are refreshing, and the current thread doesn't own any UGP locks.""" if (not _has_refreshing_tables(*args, **kwargs) or not auto_locking or has_shared_lock() or has_exclusive_lock()): yield else: with shared_lock(): yield