Source code for deephaven.filters

#
# Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
#

"""This module implements various filters that can be used in deephaven table's filter operations."""

from __future__ import annotations

from collections.abc import Sequence
from enum import Enum
from typing import Callable, Union

import jpy

from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.concurrency_control import Barrier, ConcurrencyControl
from deephaven.jcompat import to_sequence

_JFilter = jpy.get_type("io.deephaven.api.filter.Filter")
_JFilterNot = jpy.get_type("io.deephaven.api.filter.FilterNot")
_JFilterIn = jpy.get_type("io.deephaven.api.filter.FilterIn")
_JFilterComparison = jpy.get_type("io.deephaven.api.filter.FilterComparison")
_JLiteral = jpy.get_type("io.deephaven.api.literal.Literal")
_JColumnName = jpy.get_type("io.deephaven.api.ColumnName")
_JFilterPattern = jpy.get_type("io.deephaven.api.filter.FilterPattern")
_JPatternMode = jpy.get_type("io.deephaven.api.filter.FilterPattern$Mode")
_JPattern = jpy.get_type("java.util.regex.Pattern")
_JIncrementalReleaseFilter = jpy.get_type(
    "io.deephaven.engine.table.impl.select.IncrementalReleaseFilter"
)


[docs] class ColumnName(str): """A string subclass representing a column name.""" __slots__ = ()
[docs] class Filter(ConcurrencyControl["Filter"], JObjectWrapper): """A Filter object represents a filter that can be used in Table's filtering(where) operations. Explicit concurrency and ordering control can be specified on a Filter to affect the parallelization of its evaluation during the Table filtering operation. """ j_object_type = _JFilter @property def j_object(self) -> jpy.JType: return self.j_filter def __init__(self, j_filter): self.j_filter = j_filter
[docs] def with_declared_barriers( self, barriers: Union[Barrier, Sequence[Barrier]] ) -> Filter: """Returns a new Filter with the given declared barriers. Args: barriers (Union[Barrier, Sequence[Barrier]]): the barrier(s) to declare Returns: a new Filter with the given declared barriers Raises: DHError """ try: barriers = to_sequence(barriers) return Filter(j_filter=self.j_filter.withDeclaredBarriers(*barriers)) except Exception as e: raise DHError(e, "failed to create filter with declared barriers.") from e
[docs] def with_respected_barriers( self, barriers: Union[Barrier, Sequence[Barrier]] ) -> Filter: """Returns a new Filter with the given respected barriers. Args: barriers (Union[Barrier, Sequence[Barrier]]): the barrier(s) to respect Returns: a new Filter with the given respected barriers Raises: DHError """ try: barriers = to_sequence(barriers) return Filter(j_filter=self.j_filter.withRespectedBarriers(*barriers)) except Exception as e: raise DHError(e, "failed to create filter with respected barriers.") from e
[docs] def with_serial(self) -> Filter: """Returns a new Filter with serial evaluation enforced. Returns: a new Filter with serial evaluation enforced Raises: DHError """ try: return Filter(j_filter=self.j_filter.withSerial()) except Exception as e: raise DHError(e, "failed to create filter with serial evaluation.") from e
[docs] def not_(self): """Creates a new filter that evaluates to the opposite of what this filter evaluates to. Returns: a new not Filter """ return Filter(j_filter=_JFilterNot.of(self.j_filter))
[docs] @classmethod def from_( cls, conditions: Union[str, Sequence[str]] ) -> Union[Filter, Sequence[Filter]]: """Creates filter(s) from the given condition(s). Args: conditions (Union[str, Sequence[str]]): filter condition(s) Returns: filter(s) Raises: DHError """ conditions = to_sequence(conditions) try: filters = [ cls(j_filter=j_filter) for j_filter in getattr(_JFilter, "from")(conditions).toArray() ] return filters if len(filters) != 1 else filters[0] except Exception as e: raise DHError(e, "failed to create filters.") from e
[docs] def or_(filters: Union[str, Filter, Sequence[str], Sequence[Filter]]) -> Filter: """Creates a new filter that evaluates to true when any of the given filters evaluates to true. Args: filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the component filter(s) Returns: a new or Filter """ seq = [ Filter.from_(f).j_filter if isinstance(f, str) else f # type: ignore[union-attr] for f in to_sequence(filters) ] return Filter(j_filter=getattr(_JFilter, "or")(*seq))
[docs] def and_(filters: Union[str, Filter, Sequence[str], Sequence[Filter]]) -> Filter: """Creates a new filter that evaluates to true when all the given filters evaluates to true. Args: filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the component filters Returns: a new and Filter """ seq = [ Filter.from_(f).j_filter if isinstance(f, str) else f # type: ignore[union-attr] for f in to_sequence(filters) ] return Filter(j_filter=getattr(_JFilter, "and")(*seq))
[docs] def not_(filter_: Filter) -> Filter: """Creates a new filter that evaluates to the opposite of what filter_ evaluates to. Args: filter_ (Filter): the filter to negate with Returns: a new not Filter """ return Filter(j_filter=getattr(_JFilter, "not")(filter_.j_filter))
[docs] def is_null(col: str) -> Filter: """Creates a new filter that evaluates to true when the col is null, and evaluates to false when col is not null. Args: col (str): the column name Returns: a new is-null Filter """ return Filter(j_filter=_JFilter.isNull(_JColumnName.of(col)))
[docs] def is_not_null(col: str) -> Filter: """Creates a new filter that evaluates to true when the col is not null, and evaluates to false when col is null. Args: col (str): the column name Returns: a new is-not-null Filter """ return Filter(j_filter=_JFilter.isNotNull(_JColumnName.of(col)))
[docs] class PatternMode(Enum): """The regex mode to use""" MATCHES = _JPatternMode.MATCHES """Matches the entire input against the pattern""" FIND = _JPatternMode.FIND """Matches any subsequence of the input against the pattern"""
[docs] def pattern( mode: PatternMode, col: str, regex: str, invert_pattern: bool = False ) -> Filter: """Creates a regular-expression pattern filter. See https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/regex/Pattern.html for documentation on the regex pattern. This filter will never match ``null`` values. Args: mode (PatternMode): the mode col (str): the column name regex (str): the regex pattern invert_pattern (bool): if the pattern matching logic should be inverted Returns: a new pattern filter Raises: DHError """ try: return Filter( j_filter=_JFilterPattern.of( _JColumnName.of(col), _JPattern.compile(regex), mode.value, invert_pattern, ) ) except Exception as e: raise DHError(e, "failed to create a pattern filter.") from e
[docs] def incremental_release(initial_rows: int, increment: int) -> Filter: """Creates an incremental release filter that progressively releases rows from a table. This filter gradually releases data, starting with an initial number of rows and then incrementally adding more rows over time. The input table must be an add-only table. Args: initial_rows (int): the initial number of rows to release increment (int): the number of additional rows to release in each subsequent step Returns: a new incremental release filter Raises: DHError """ try: return Filter(j_filter=_JIncrementalReleaseFilter(initial_rows, increment)) except Exception as e: raise DHError(e, "failed to create incremental release filter.") from e
[docs] def in_(col: str, values: Sequence[Union[bool, int, float, str]]) -> Filter: """Creates a new filter that evaluates to true when the column's value is in the given values. Args: col (str): the column name values (Sequence[Union[bool, int, float, str]]): the values to check against Returns: a new in Filter Raises: DHError """ try: j_literals = [_JLiteral.of(v) for v in values] return Filter(j_filter=_JFilterIn.of(_JColumnName.of(col), j_literals)) except Exception as e: raise DHError(e, "failed to create an in filter.") from e
_FILTER_COMPARISON_MAP: dict[str, Callable] = { "eq": _JFilterComparison.eq, "ne": _JFilterComparison.neq, "lt": _JFilterComparison.lt, "le": _JFilterComparison.leq, "gt": _JFilterComparison.gt, "ge": _JFilterComparison.geq, } def _j_filter_comparison( op: str, left: Union[bool, int, float, str, ColumnName], right: Union[bool, int, float, str, ColumnName], ) -> jpy.JType: j_left = ( _JColumnName.of(left) if isinstance(left, ColumnName) else _JLiteral.of(left) ) j_right = ( _JColumnName.of(right) if isinstance(right, ColumnName) else _JLiteral.of(right) ) return _FILTER_COMPARISON_MAP[op](j_left, j_right)
[docs] def eq( left: Union[bool, int, float, str, ColumnName], right: Union[bool, int, float, str, ColumnName], ) -> Filter: """Creates a new filter that evaluates to true when the left operand is equal to the right operand. Args: left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name Returns: Filter: a new equality filter """ return Filter(j_filter=_j_filter_comparison("eq", left=left, right=right))
[docs] def ne( left: Union[bool, int, float, str, ColumnName], right: Union[bool, int, float, str, ColumnName], ) -> Filter: """Creates a new filter that evaluates to true when the left operand is not equal to the right operand. Args: left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name Returns: Filter: a new inequality filter """ return Filter(j_filter=_j_filter_comparison("ne", left=left, right=right))
[docs] def lt( left: Union[bool, int, float, str, ColumnName], right: Union[bool, int, float, str, ColumnName], ) -> Filter: """Creates a new filter that evaluates to true when the left operand is less than the right operand. Args: left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name Returns: Filter: a new less-than filter """ return Filter(j_filter=_j_filter_comparison("lt", left=left, right=right))
[docs] def le( left: Union[bool, int, float, str, ColumnName], right: Union[bool, int, float, str, ColumnName], ) -> Filter: """Creates a new filter that evaluates to true when the left operand is less than or equal to the right operand. Args: left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name Returns: Filter: a new less-than-or-equal filter """ return Filter(j_filter=_j_filter_comparison("le", left=left, right=right))
[docs] def gt( left: Union[bool, int, float, str, ColumnName], right: Union[bool, int, float, str, ColumnName], ) -> Filter: """Creates a new filter that evaluates to true when the left operand is greater than the right operand. Args: left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name Returns: Filter: a new greater-than filter """ return Filter(j_filter=_j_filter_comparison("gt", left=left, right=right))
[docs] def ge( left: Union[bool, int, float, str, ColumnName], right: Union[bool, int, float, str, ColumnName], ) -> Filter: """Creates a new filter that evaluates to true when the left operand is greater than or equal to the right operand. Args: left (Union[bool, int, float, str, ColumnName]): the left operand, either a literal value or a column name right (Union[bool, int, float, str, ColumnName]): the right operand, either a literal value or a column name Returns: Filter: a new greater-than-or-equal filter """ return Filter(j_filter=_j_filter_comparison("ge", left=left, right=right))