Source code for deephaven.filters

#
# Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
#

"""This module implement various filters that can be used in deephaven table's filter operations."""

from __future__ import annotations

from collections.abc import Sequence
from enum import Enum
from typing import Union

import jpy

from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.concurrency_control import Barrier, ConcurrencyControl
from deephaven.jcompat import to_sequence

_JFilter = jpy.get_type("io.deephaven.api.filter.Filter")
_JColumnName = jpy.get_type("io.deephaven.api.ColumnName")
_JFilterPattern = jpy.get_type("io.deephaven.api.filter.FilterPattern")
_JPatternMode = jpy.get_type("io.deephaven.api.filter.FilterPattern$Mode")
_JPattern = jpy.get_type("java.util.regex.Pattern")
_JIncrementalReleaseFilter = jpy.get_type(
    "io.deephaven.engine.table.impl.select.IncrementalReleaseFilter"
)


[docs] class Filter(ConcurrencyControl["Filter"], JObjectWrapper): """A Filter object represents a filter that can be used in Table's filtering(where) operations. Explicit concurrency and ordering control can be specified on a Filter to affects the parallelization of its evaluation during Table filtering operation. """ j_object_type = _JFilter @property def j_object(self) -> jpy.JType: return self.j_filter def __init__(self, j_filter): self.j_filter = j_filter
[docs] def with_declared_barriers( self, barriers: Union[Barrier, Sequence[Barrier]] ) -> Filter: """Returns a new Filter with the given declared barriers. Args: barriers (Union[Barrier, Sequence[Barrier]]): the barrier(s) to declare Returns: a new Filter with the given declared barriers Raises: DHError """ try: barriers = to_sequence(barriers) return Filter(j_filter=self.j_filter.withDeclaredBarriers(*barriers)) except Exception as e: raise DHError(e, "failed to create filter with declared barriers.") from e
[docs] def with_respected_barriers( self, barriers: Union[Barrier, Sequence[Barrier]] ) -> Filter: """Returns a new Filter with the given respected barriers. Args: barriers (Union[Barrier, Sequence[Barrier]]): the barrier(s) to respect Returns: a new Filter with the given respected barriers Raises: DHError """ try: barriers = to_sequence(barriers) return Filter(j_filter=self.j_filter.withRespectedBarriers(*barriers)) except Exception as e: raise DHError(e, "failed to create filter with respected barriers.") from e
[docs] def with_serial(self) -> Filter: """Returns a new Filter with serial evaluation enforced. Returns: a new Filter with serial evaluation enforced Raises: DHError """ try: return Filter(j_filter=self.j_filter.withSerial()) except Exception as e: raise DHError(e, "failed to create filter with serial evaluation.") from e
[docs] def not_(self): """Creates a new filter that evaluates to the opposite of what this filter evaluates to. Returns: a new not Filter """ return Filter(j_filter=getattr(_JFilter, "not")(self.j_filter))
[docs] @classmethod def from_( cls, conditions: Union[str, Sequence[str]] ) -> Union[Filter, Sequence[Filter]]: """Creates filter(s) from the given condition(s). Args: conditions (Union[str, Sequence[str]]): filter condition(s) Returns: filter(s) Raises: DHError """ conditions = to_sequence(conditions) try: filters = [ cls(j_filter=j_filter) for j_filter in getattr(_JFilter, "from")(conditions).toArray() ] return filters if len(filters) != 1 else filters[0] except Exception as e: raise DHError(e, "failed to create filters.") from e
[docs] def or_(filters: Union[str, Filter, Sequence[str], Sequence[Filter]]) -> Filter: """Creates a new filter that evaluates to true when any of the given filters evaluates to true. Args: filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the component filter(s) Returns: a new or Filter """ seq = [ Filter.from_(f).j_filter if isinstance(f, str) else f # type: ignore[union-attr] for f in to_sequence(filters) ] return Filter(j_filter=getattr(_JFilter, "or")(*seq))
[docs] def and_(filters: Union[str, Filter, Sequence[str], Sequence[Filter]]) -> Filter: """Creates a new filter that evaluates to true when all the given filters evaluates to true. Args: filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the component filters Returns: a new and Filter """ seq = [ Filter.from_(f).j_filter if isinstance(f, str) else f # type: ignore[union-attr] for f in to_sequence(filters) ] return Filter(j_filter=getattr(_JFilter, "and")(*seq))
[docs] def not_(filter_: Filter) -> Filter: """Creates a new filter that evaluates to the opposite of what filter_ evaluates to. Args: filter_ (Filter): the filter to negate with Returns: a new not Filter """ return filter_.not_()
[docs] def is_null(col: str) -> Filter: """Creates a new filter that evaluates to true when the col is null, and evaluates to false when col is not null. Args: col (str): the column name Returns: a new is-null Filter """ return Filter(j_filter=_JFilter.isNull(_JColumnName.of(col)))
[docs] def is_not_null(col: str) -> Filter: """Creates a new filter that evaluates to true when the col is not null, and evaluates to false when col is null. Args: col (str): the column name Returns: a new is-not-null Filter """ return Filter(j_filter=_JFilter.isNotNull(_JColumnName.of(col)))
[docs] class PatternMode(Enum): """The regex mode to use""" MATCHES = _JPatternMode.MATCHES """Matches the entire input against the pattern""" FIND = _JPatternMode.FIND """Matches any subsequence of the input against the pattern"""
[docs] def pattern( mode: PatternMode, col: str, regex: str, invert_pattern: bool = False ) -> Filter: """Creates a regular-expression pattern filter. See https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/regex/Pattern.html for documentation on the regex pattern. This filter will never match ``null`` values. Args: mode (PatternMode): the mode col (str): the column name regex (str): the regex pattern invert_pattern (bool): if the pattern matching logic should be inverted Returns: a new pattern filter Raises: DHError """ try: return Filter( j_filter=_JFilterPattern.of( _JColumnName.of(col), _JPattern.compile(regex), mode.value, invert_pattern, ) ) except Exception as e: raise DHError(e, "failed to create a pattern filter.") from e
[docs] def incremental_release(initial_rows: int, increment: int) -> Filter: """Creates an incremental release filter that progressively releases rows from a table. This filter gradually releases data, starting with an initial number of rows and then incrementally adding more rows over time. The input table must be an add-only table. Args: initial_rows (int): the initial number of rows to release increment (int): the number of additional rows to release in each subsequent step Returns: a new incremental release filter Raises: DHError """ try: return Filter(j_filter=_JIncrementalReleaseFilter(initial_rows, increment)) except Exception as e: raise DHError(e, "failed to create incremental release filter.") from e