#
# Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
#
"""This module implements various filters that can be used in deephaven table's filter operations."""
from __future__ import annotations
from collections.abc import Sequence
from enum import Enum
from typing import Callable, Union
import jpy
from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.concurrency_control import Barrier, ConcurrencyControl
from deephaven.jcompat import to_sequence
_JFilter = jpy.get_type("io.deephaven.api.filter.Filter")
_JFilterNot = jpy.get_type("io.deephaven.api.filter.FilterNot")
_JFilterIn = jpy.get_type("io.deephaven.api.filter.FilterIn")
_JFilterComparison = jpy.get_type("io.deephaven.api.filter.FilterComparison")
_JLiteral = jpy.get_type("io.deephaven.api.literal.Literal")
_JColumnName = jpy.get_type("io.deephaven.api.ColumnName")
_JFilterPattern = jpy.get_type("io.deephaven.api.filter.FilterPattern")
_JPatternMode = jpy.get_type("io.deephaven.api.filter.FilterPattern$Mode")
_JPattern = jpy.get_type("java.util.regex.Pattern")
_JIncrementalReleaseFilter = jpy.get_type(
"io.deephaven.engine.table.impl.select.IncrementalReleaseFilter"
)
[docs]
class ColumnName(str):
"""A string subclass representing a column name."""
__slots__ = ()
[docs]
class Filter(ConcurrencyControl["Filter"], JObjectWrapper):
"""A Filter object represents a filter that can be used in Table's filtering(where) operations.
Explicit concurrency and ordering control can be specified on a Filter to affect the parallelization of its
evaluation during the Table filtering operation.
"""
j_object_type = _JFilter
@property
def j_object(self) -> jpy.JType:
return self.j_filter
def __init__(self, j_filter):
self.j_filter = j_filter
[docs]
def with_declared_barriers(
self, barriers: Union[Barrier, Sequence[Barrier]]
) -> Filter:
"""Returns a new Filter with the given declared barriers.
Args:
barriers (Union[Barrier, Sequence[Barrier]]): the barrier(s) to declare
Returns:
a new Filter with the given declared barriers
Raises:
DHError
"""
try:
barriers = to_sequence(barriers)
return Filter(j_filter=self.j_filter.withDeclaredBarriers(*barriers))
except Exception as e:
raise DHError(e, "failed to create filter with declared barriers.") from e
[docs]
def with_respected_barriers(
self, barriers: Union[Barrier, Sequence[Barrier]]
) -> Filter:
"""Returns a new Filter with the given respected barriers.
Args:
barriers (Union[Barrier, Sequence[Barrier]]): the barrier(s) to respect
Returns:
a new Filter with the given respected barriers
Raises:
DHError
"""
try:
barriers = to_sequence(barriers)
return Filter(j_filter=self.j_filter.withRespectedBarriers(*barriers))
except Exception as e:
raise DHError(e, "failed to create filter with respected barriers.") from e
[docs]
def with_serial(self) -> Filter:
"""Returns a new Filter with serial evaluation enforced.
Returns:
a new Filter with serial evaluation enforced
Raises:
DHError
"""
try:
return Filter(j_filter=self.j_filter.withSerial())
except Exception as e:
raise DHError(e, "failed to create filter with serial evaluation.") from e
[docs]
def not_(self):
"""Creates a new filter that evaluates to the opposite of what this filter evaluates to.
Returns:
a new not Filter
"""
return Filter(j_filter=_JFilterNot.of(self.j_filter))
[docs]
@classmethod
def from_(
cls, conditions: Union[str, Sequence[str]]
) -> Union[Filter, Sequence[Filter]]:
"""Creates filter(s) from the given condition(s).
Args:
conditions (Union[str, Sequence[str]]): filter condition(s)
Returns:
filter(s)
Raises:
DHError
"""
conditions = to_sequence(conditions)
try:
filters = [
cls(j_filter=j_filter)
for j_filter in getattr(_JFilter, "from")(conditions).toArray()
]
return filters if len(filters) != 1 else filters[0]
except Exception as e:
raise DHError(e, "failed to create filters.") from e
[docs]
def or_(filters: Union[str, Filter, Sequence[str], Sequence[Filter]]) -> Filter:
"""Creates a new filter that evaluates to true when any of the given filters evaluates to true.
Args:
filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the component filter(s)
Returns:
a new or Filter
"""
seq = [
Filter.from_(f).j_filter if isinstance(f, str) else f # type: ignore[union-attr]
for f in to_sequence(filters)
]
return Filter(j_filter=getattr(_JFilter, "or")(*seq))
[docs]
def and_(filters: Union[str, Filter, Sequence[str], Sequence[Filter]]) -> Filter:
"""Creates a new filter that evaluates to true when all the given filters evaluates to true.
Args:
filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the component filters
Returns:
a new and Filter
"""
seq = [
Filter.from_(f).j_filter if isinstance(f, str) else f # type: ignore[union-attr]
for f in to_sequence(filters)
]
return Filter(j_filter=getattr(_JFilter, "and")(*seq))
[docs]
def not_(filter_: Filter) -> Filter:
"""Creates a new filter that evaluates to the opposite of what filter_ evaluates to.
Args:
filter_ (Filter): the filter to negate with
Returns:
a new not Filter
"""
return Filter(j_filter=getattr(_JFilter, "not")(filter_.j_filter))
[docs]
def is_null(col: str) -> Filter:
"""Creates a new filter that evaluates to true when the col is null, and evaluates to false when col is not null.
Args:
col (str): the column name
Returns:
a new is-null Filter
"""
return Filter(j_filter=_JFilter.isNull(_JColumnName.of(col)))
[docs]
def is_not_null(col: str) -> Filter:
"""Creates a new filter that evaluates to true when the col is not null, and evaluates to false when col is null.
Args:
col (str): the column name
Returns:
a new is-not-null Filter
"""
return Filter(j_filter=_JFilter.isNotNull(_JColumnName.of(col)))
[docs]
class PatternMode(Enum):
"""The regex mode to use"""
MATCHES = _JPatternMode.MATCHES
"""Matches the entire input against the pattern"""
FIND = _JPatternMode.FIND
"""Matches any subsequence of the input against the pattern"""
[docs]
def pattern(
mode: PatternMode, col: str, regex: str, invert_pattern: bool = False
) -> Filter:
"""Creates a regular-expression pattern filter.
See https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/regex/Pattern.html for documentation on
the regex pattern.
This filter will never match ``null`` values.
Args:
mode (PatternMode): the mode
col (str): the column name
regex (str): the regex pattern
invert_pattern (bool): if the pattern matching logic should be inverted
Returns:
a new pattern filter
Raises:
DHError
"""
try:
return Filter(
j_filter=_JFilterPattern.of(
_JColumnName.of(col),
_JPattern.compile(regex),
mode.value,
invert_pattern,
)
)
except Exception as e:
raise DHError(e, "failed to create a pattern filter.") from e
[docs]
def incremental_release(initial_rows: int, increment: int) -> Filter:
"""Creates an incremental release filter that progressively releases rows from a table.
This filter gradually releases data, starting with an initial number of rows and then
incrementally adding more rows over time. The input table must be an add-only table.
Args:
initial_rows (int): the initial number of rows to release
increment (int): the number of additional rows to release in each subsequent step
Returns:
a new incremental release filter
Raises:
DHError
"""
try:
return Filter(j_filter=_JIncrementalReleaseFilter(initial_rows, increment))
except Exception as e:
raise DHError(e, "failed to create incremental release filter.") from e
[docs]
def in_(col: str, values: Sequence[Union[bool, int, float, str]]) -> Filter:
"""Creates a new filter that evaluates to true when the column's value is in the given values.
Args:
col (str): the column name
values (Sequence[Union[bool, int, float, str]]): the values to check against
Returns:
a new in Filter
Raises:
DHError
"""
try:
j_literals = [_JLiteral.of(v) for v in values]
return Filter(j_filter=_JFilterIn.of(_JColumnName.of(col), j_literals))
except Exception as e:
raise DHError(e, "failed to create an in filter.") from e
_FILTER_COMPARISON_MAP: dict[str, Callable] = {
"eq": _JFilterComparison.eq,
"ne": _JFilterComparison.neq,
"lt": _JFilterComparison.lt,
"le": _JFilterComparison.leq,
"gt": _JFilterComparison.gt,
"ge": _JFilterComparison.geq,
}
def _j_filter_comparison(
op: str,
left: Union[bool, int, float, str, ColumnName],
right: Union[bool, int, float, str, ColumnName],
) -> jpy.JType:
j_left = (
_JColumnName.of(left) if isinstance(left, ColumnName) else _JLiteral.of(left)
)
j_right = (
_JColumnName.of(right) if isinstance(right, ColumnName) else _JLiteral.of(right)
)
return _FILTER_COMPARISON_MAP[op](j_left, j_right)
[docs]
def eq(
left: Union[bool, int, float, str, ColumnName],
right: Union[bool, int, float, str, ColumnName],
) -> Filter:
"""Creates a new filter that evaluates to true when the left operand is equal to the right operand.
Args:
left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name
right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name
Returns:
Filter: a new equality filter
"""
return Filter(j_filter=_j_filter_comparison("eq", left=left, right=right))
[docs]
def ne(
left: Union[bool, int, float, str, ColumnName],
right: Union[bool, int, float, str, ColumnName],
) -> Filter:
"""Creates a new filter that evaluates to true when the left operand is not equal to the right operand.
Args:
left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name
right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name
Returns:
Filter: a new inequality filter
"""
return Filter(j_filter=_j_filter_comparison("ne", left=left, right=right))
[docs]
def lt(
left: Union[bool, int, float, str, ColumnName],
right: Union[bool, int, float, str, ColumnName],
) -> Filter:
"""Creates a new filter that evaluates to true when the left operand is less than the right operand.
Args:
left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name
right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name
Returns:
Filter: a new less-than filter
"""
return Filter(j_filter=_j_filter_comparison("lt", left=left, right=right))
[docs]
def le(
left: Union[bool, int, float, str, ColumnName],
right: Union[bool, int, float, str, ColumnName],
) -> Filter:
"""Creates a new filter that evaluates to true when the left operand is less than or equal to the right operand.
Args:
left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name
right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name
Returns:
Filter: a new less-than-or-equal filter
"""
return Filter(j_filter=_j_filter_comparison("le", left=left, right=right))
[docs]
def gt(
left: Union[bool, int, float, str, ColumnName],
right: Union[bool, int, float, str, ColumnName],
) -> Filter:
"""Creates a new filter that evaluates to true when the left operand is greater than the right operand.
Args:
left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name
right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name
Returns:
Filter: a new greater-than filter
"""
return Filter(j_filter=_j_filter_comparison("gt", left=left, right=right))
[docs]
def ge(
left: Union[bool, int, float, str, ColumnName],
right: Union[bool, int, float, str, ColumnName],
) -> Filter:
"""Creates a new filter that evaluates to true when the left operand is greater than or equal to the right operand.
Args:
left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name
right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name
Returns:
Filter: a new greater-than-or-equal filter
"""
return Filter(j_filter=_j_filter_comparison("ge", left=left, right=right))