#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module provides access to the Update Graph (UG)'s locks that must be acquired to perform certain
table operations. When working with refreshing tables, UG locks must be held in order to have a consistent view of
the data between table operations.
"""
import contextlib
from functools import wraps
from typing import Callable, Union, Optional
import jpy
from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
_JUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.UpdateGraph")
auto_locking = True
"""Whether to automatically acquire the Update Graph (UG) shared lock for an unsafe operation on a refreshing
table when the current thread doesn't own either the UG shared or the UG exclusive lock. The newly obtained lock will
be released after the table operation finishes. Auto locking is turned on by default."""
[docs]
class UpdateGraph(JObjectWrapper):
"""An Update Graph handles table update propagation within the Deephaven query engine. It provides access to
various control knobs and tools for ensuring consistency or blocking update processing.
"""
j_object_type = _JUpdateGraph
@property
def j_object(self) -> jpy.JType:
return self.j_update_graph
def __init__(self, j_update_graph: jpy.JType):
self.j_update_graph = j_update_graph
[docs]
def has_exclusive_lock(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> bool:
"""Checks if the current thread is holding the provided Update Graph's (UG) exclusive lock.
Args:
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
Returns:
True if the current thread is holding the Update Graph (UG) exclusive lock, False otherwise.
"""
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph
return ug.j_update_graph.exclusiveLock().isHeldByCurrentThread()
[docs]
def has_shared_lock(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> bool:
"""Checks if the current thread is holding the provided Update Graph's (UG) shared lock.
Args:
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
Returns:
True if the current thread is holding the Update Graph (UG) shared lock, False otherwise.
"""
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph
return ug.j_update_graph.sharedLock().isHeldByCurrentThread()
[docs]
@contextlib.contextmanager
def exclusive_lock(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]):
"""Context manager for running a block of code under an Update Graph (UG) exclusive lock.
Args:
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
"""
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph
lock = ug.j_update_graph.exclusiveLock()
lock.lock()
try:
yield
except Exception as e:
raise DHError(e, "exception raised in the enclosed code block.") from e
finally:
lock.unlock()
[docs]
@contextlib.contextmanager
def shared_lock(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]):
"""Context manager for running a block of code under an Update Graph (UG) shared lock.
Args:
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
"""
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph
lock = ug.j_update_graph.sharedLock()
lock.lock()
try:
yield
except Exception as e:
raise DHError(e, "exception raised in the enclosed code block.") from e
finally:
lock.unlock()
[docs]
def exclusive_locked(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> Callable:
"""A decorator that ensures the decorated function be called under the Update Graph (UG) exclusive
lock. The lock is released after the function returns regardless of what happens inside the function.
Args:
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
"""
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph
def inner_wrapper(f: Callable) -> Callable:
@wraps(f)
def do_locked(*arg, **kwargs):
with exclusive_lock(ug):
return f(*arg, **kwargs)
return do_locked
return inner_wrapper
[docs]
def shared_locked(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> Callable:
"""A decorator that ensures the decorated function be called under the Update Graph (UG) shared lock.
The lock is released after the function returns regardless of what happens inside the function.
Args:
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
"""
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph
def inner_wrapper(f: Callable) -> Callable:
@wraps(f)
def do_locked(*arg, **kwargs):
with shared_lock(ug):
return f(*arg, **kwargs)
return do_locked
return inner_wrapper
def _is_arg_refreshing(arg) -> bool:
if isinstance(arg, list) or isinstance(arg, tuple):
for e in arg:
if _is_arg_refreshing(e):
return True
elif getattr(arg, "is_refreshing", False):
return True
return False
def _first_refreshing_table(*args, **kwargs) -> Optional["Table"]:
for arg in args:
if _is_arg_refreshing(arg):
return arg
for k, v in kwargs.items():
if _is_arg_refreshing(v):
return v
return None
def _serial_table_operations_safe(
ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> bool:
"""Checks if the current thread is marked as being able to safely call serial operations according to the provided
Update Graph (UG) without locking.
Args:
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
Returns:
True if the current thread is marked as being able to safely call serial operations according to the provided
Update Graph (UG), False otherwise.
"""
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph
return ug.j_update_graph.serialTableOperationsSafe()
def _current_thread_processes_updates(
ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> bool:
"""Checks if the current thread processes updates for the provided Update Graph (UG).
Args:
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
Returns:
True if the current thread processes updates for the provided Update Graph (UG), False otherwise.
"""
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph
return ug.j_update_graph.currentThreadProcessesUpdates()
[docs]
def auto_locking_op(f: Callable) -> Callable:
"""A decorator for annotating unsafe Table operations. It ensures that the decorated function runs under the UG
shared lock if ugp.auto_locking is True, the target table-like object or any table-like arguments are refreshing,
the current thread doesn't own any UG lock, and the current thread is not part of the update graph."""
@wraps(f)
def do_locked(*args, **kwargs):
arg = _first_refreshing_table(*args, **kwargs)
if (not arg
or not auto_locking
or has_shared_lock(arg)
or has_exclusive_lock(arg)
or _serial_table_operations_safe(arg)
or _current_thread_processes_updates(arg)):
return f(*args, **kwargs)
with shared_lock(arg.update_graph):
return f(*args, **kwargs)
return do_locked
[docs]
@contextlib.contextmanager
def auto_locking_ctx(*args, **kwargs):
"""An auto-locking aware context manager. It ensures that the enclosed code block runs under the UG shared lock if
ugp.auto_locking is True, the target table-like object or any table-like arguments are refreshing, the current
thread doesn't own any UG lock, and the current thread is not part of the update graph."""
arg = _first_refreshing_table(*args, **kwargs)
if (not arg
or not auto_locking
or has_shared_lock(arg)
or has_exclusive_lock(arg)
or _serial_table_operations_safe(arg)
or _current_thread_processes_updates(arg)):
yield
else:
with shared_lock(arg.update_graph):
yield