deephaven.stream.kafka.consumer

The kafka.consumer module supports consuming a Kakfa topic as a Deephaven live table.

ALL_PARTITIONS_DONT_SEEK = {-1: -2}

For all partitions, start consuming at the current position.

ALL_PARTITIONS_SEEK_TO_BEGINNING = {-1: -1}

For all partitions, start consuming at the beginning.

ALL_PARTITIONS_SEEK_TO_END = {-1: -3}

For all partitions, start consuming at the end.

DONT_SEEK = -2

Start consuming at the current position of a partition.

class KeyValueSpec(j_spec)[source]

Bases: deephaven._wrapper.JObjectWrapper

j_object_type

alias of io.deephaven.kafka.KafkaTools$Consume$KeyOrValueSpec

SEEK_TO_BEGINNING = -1

Start consuming at the beginning of a partition.

SEEK_TO_END = -3

Start consuming at the end of a partition.

class TableType(j_table_type)[source]

Bases: deephaven._wrapper.JObjectWrapper

An Enum that defines the supported Table Type for consuming Kafka.

static append()[source]

Consume all partitions into a single interleaved in-memory append-only table.

j_object_type

alias of io.deephaven.kafka.KafkaTools$TableType

static ring(capacity)[source]

Consume all partitions into a single in-memory ring table.

static stream()[source]

Consume all partitions into a single interleaved stream table, which will present only newly-available rows to downstream operations and visualizations.

avro_spec(schema, schema_version='latest', mapping=None, mapped_only=False)[source]

Creates a spec for how to use an Avro schema when consuming a Kafka stream to a Deephaven table.

Parameters
  • schema (str) –

    Either a JSON encoded Avro schema definition string, or the name for a schema registered in a Confluent compatible Schema Server.

    If the name for a schema in Schema Server, the associated

    ’kafka_config’ parameter in the call to consume() should include the key ‘schema.registry.url’ with the value of the Schema Server URL for fetching the schema definition

  • schema_version (str) – the schema version to fetch from schema service, default is ‘latest’

  • mapping (Dict[str, str]) – a mapping from Avro field name to Deephaven table column name; the fields specified in the mapping will have their column names defined by it; if ‘mapped_only’ parameter is False, any other fields not mentioned in the mapping will use the same Avro field name for Deephaven table column; otherwise, these unmapped fields will be ignored and will not be present in the resulting table. default is None

  • mapped_only (bool) – whether to ignore Avro fields not present in the ‘mapping’ argument, default is False

Return type

KeyValueSpec

Returns

a KeyValueSpec

Raises

DHError

consume(kafka_config, topic, partitions=None, offsets=None, key_spec=None, value_spec=None, table_type=deephaven.stream.kafka.consumer.TableType(io.deephaven.kafka.KafkaTools$TableType$Stream(objectRef=0x3e22290)))[source]

Consume from Kafka to a Deephaven table.

Parameters
  • kafka_config (Dict) – configuration for the associated Kafka consumer and also the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer specific desired configuration here

  • topic (str) – the Kafka topic name

  • partitions (List[int]) – a list of integer partition numbers, default is None which means all partitions

  • offsets (Dict[int, int]) – a mapping between partition numbers and offset numbers, and can be one of the predefined ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK. The default is None which works the same as ALL_PARTITIONS_DONT_SEEK. The offset numbers may be one of the predefined SEEK_TO_BEGINNING, SEEK_TO_END, or DONT_SEEK.

  • key_spec (KeyValueSpec) – specifies how to map the Key field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type

  • value_spec (KeyValueSpec) – specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type

  • table_type (TableType) – a TableType enum, default is TableType.stream()

Return type

Table

Returns

a Deephaven live table that will update based on Kafka messages consumed for the given topic

Raises

DHError

consume_to_partitioned_table(kafka_config, topic, partitions=None, offsets=None, key_spec=None, value_spec=None, table_type=deephaven.stream.kafka.consumer.TableType(io.deephaven.kafka.KafkaTools$TableType$Stream(objectRef=0x3e22298)))[source]

Consume from Kafka to a Deephaven partitioned table.

Parameters
  • kafka_config (Dict) – configuration for the associated Kafka consumer and also the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer specific desired configuration here

  • topic (str) – the Kafka topic name

  • partitions (List[int]) – a list of integer partition numbers, default is None which means all partitions

  • offsets (Dict[int, int]) – a mapping between partition numbers and offset numbers, and can be one of the predefined ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK. The default is None which works the same as ALL_PARTITIONS_DONT_SEEK. The offset numbers may be one of the predefined SEEK_TO_BEGINNING, SEEK_TO_END, or DONT_SEEK.

  • key_spec (KeyValueSpec) – specifies how to map the Key field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type

  • value_spec (KeyValueSpec) – specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type

  • table_type (TableType) – a TableType enum, specifying the type of the expected result’s constituent tables, default is TableType.stream()

Return type

PartitionedTable

Returns

a Deephaven live partitioned table that will update based on Kafka messages consumed for the given topic, the keys of this partitioned table are the partition numbers of the topic, and its constituents are tables per topic partition.

Raises

DHError

json_spec(col_defs, mapping=None)[source]

Creates a spec for how to use JSON data when consuming a Kafka stream to a Deephaven table.

Parameters
  • col_defs (List[Tuple[str, DType]]) – a list of tuples specifying names and types for columns to be created on the resulting Deephaven table. Tuples contain two elements, a string for column name and a Deephaven type for column data type.

  • mapping (Dict) –

    a dict mapping JSON fields to column names defined in the col_defs

    argument. Fields starting with a ‘/’ character are interpreted as a JSON Pointer (see RFC 6901, ISSN: 2070-1721 for details, essentially nested fields are represented like “/parent/nested”). Fields not starting with a ‘/’ character are interpreted as toplevel field names. If the mapping argument is not present or None, a 1:1 mapping between JSON fields and Deephaven

    table column names is assumed.

Return type

KeyValueSpec

Returns

a KeyValueSpec

Raises

DHError

simple_spec(col_name, data_type=None)[source]

Creates a spec that defines a single column to receive the key or value of a Kafka message when consuming a Kafka stream to a Deephaven table.

Parameters
  • col_name (str) – the Deephaven column name

  • data_type (DType) – the column data type

Return type

KeyValueSpec

Returns

a KeyValueSpec

Raises

DHError