deephaven.stream.kafka.producer

The kafka.producer module supports publishing Deephaven tables to Kafka streams.

class KeyValueSpec(j_spec)[source]

Bases: deephaven._wrapper.JObjectWrapper

j_object_type

alias of io.deephaven.kafka.KafkaTools$Produce$KeyOrValueSpec

avro_spec(schema, schema_version='latest', field_to_col_mapping=None, timestamp_field=None, include_only_columns=None, exclude_columns=None, publish_schema=False, schema_namespace=None, column_properties=None)[source]

Creates a spec for how to use an Avro schema to produce a Kafka stream from a Deephaven table.

Parameters
  • schema (str) – the name for a schema registered in a Confluent compatible Schema Server. The associated ‘kafka_config’ parameter in the call to produce() should include the key ‘schema.registry.url’ with the value of the Schema Server URL for fetching the schema definition

  • schema_version (str) – the schema version to fetch from schema service, default is ‘latest’

  • field_to_col_mapping (Dict[str, str]) – a mapping from Avro field names in the schema to column names in the Deephaven table. Any fields in the schema not present in the dict as keys are mapped to columns of the same name. The default is None, meaning all schema fields are mapped to columns of the same name.

  • timestamp_field (str) – the name of an extra timestamp field to be included in the produced Kafka message body, it is used mostly for debugging slowdowns, default is None.

  • include_only_columns (List[str]) – the list of column names in the source table to include in the generated output, default is None. When not None, the ‘exclude_columns’ parameter must be None

  • exclude_columns (List[str]) – the list of column names to exclude from the generated output (every other column will be included), default is None. When not None, the ‘include_only_columns’ must be None

  • publish_schema (bool) – when True, publish the given schema name to Schema Registry Server, according to an Avro schema generated from the table definition, for the columns and fields implied by field_to_col_mapping, include_only_columns, and exclude_columns; if a schema_version is provided and the resulting version after publishing does not match, an exception results. The default is False.

  • schema_namespace (str) – when ‘publish_schema’ is True, the namespace for the generated schema to be registered in the Schema Registry Server.

  • column_properties (Dict[str, str]) – when ‘publish_schema’ is True, specifies the properties of the columns implying particular Avro type mappings for them. In particular, column X of BigDecimal type should specify properties ‘x.precision’ and ‘x.scale’.

Return type

KeyValueSpec

Returns

a KeyValueSpec

Raises

DHError

json_spec(include_columns=None, exclude_columns=None, mapping=None, nested_delim=None, output_nulls=False, timestamp_field=None)[source]

Creates a spec for how to generate JSON data when producing a Kafka stream from a Deephaven table.

Because JSON is a nested structure, a Deephaven column can be specified to map to a top level JSON field or a field nested inside another JSON object many levels deep, e.g. X.Y.Z.field. The parameter ‘nested_delim’ controls how a JSON nested field name should be delimited in the mapping.

Parameters
  • include_columns (List[str]) – the list of Deephaven column names to include in the JSON output as fields, default is None, meaning all except the ones mentioned in the ‘exclude_columns’ argument . If not None, the ‘exclude_columns’ must be None.

  • exclude_columns (List[str]) – the list of Deephaven column names to omit in the JSON output as fields, default is None, meaning no column is omitted. If not None, include_columns must be None.

  • mapping (Dict[str, str]) – a mapping from column names to JSON field names. Any column name implied by earlier arguments and not included as a key in the map implies a field of the same name. default is None, meaning all columns will be mapped to JSON fields of the same name.

  • nested_delim (str) – if nested JSON fields are desired, the field separator that is used for the field names parameter, or None for no nesting (default). For instance, if a particular column should be mapped to JSON field X nested inside field Y, the corresponding field name value for the column key in the mapping dict can be the string “X.Y”, in which case the value for nested_delim should be “.”

  • output_nulls (bool) – when False (default), do not output a field for null column values

  • timestamp_field (str) – the name of an extra timestamp field to be included in the produced Kafka message body, it is used mostly for debugging slowdowns, default is None.

Return type

KeyValueSpec

Returns

a KeyValueSpec

Raises

DHError

produce(table, kafka_config, topic, key_spec, value_spec, last_by_key_columns=False)[source]

Produce to Kafka from a Deephaven table.

Parameters
  • table (Table) – the source table to publish to Kafka

  • kafka_config (Dict) – configuration for the associated kafka producer

  • topic (str) – the topic name

  • key_spec (KeyValueSpec) – specifies how to map table column(s) to the Key field in produced Kafka messages. This should be the result of calling one of the functions simple_spec(), avro_spec() or json_spec() in this module, or the constant KeyValueSpec.IGNORE

  • value_spec (KeyValueSpec) – specifies how to map table column(s) to the Value field in produced Kafka messages. This should be the result of calling one of the functions simple_spec(), avro_spec() or json_spec() in this, or the constant KeyValueSpec.IGNORE

  • last_by_key_columns (bool) – whether to publish only the last record for each unique key, Ignored if key_spec is KeyValueSpec.IGNORE. Otherwise, if last_by_key_columns is true this method will internally perform a last_by aggregation on table grouped by the input columns of key_spec and publish to Kafka from the result.

Return type

Callable[[], None]

Returns

a callback that, when invoked, stops publishing and cleans up subscriptions and resources. Users should hold to this callback to ensure liveness for publishing for as long as this publishing is desired, and once not desired anymore they should invoke it

Raises

DHError

simple_spec(col_name)[source]

Creates a spec that defines a single column to be published as either the key or value of a Kafka message when producing a Kafka stream from a Deephaven table.

Parameters

col_name (str) – the Deephaven column name

Return type

KeyValueSpec

Returns

a KeyValueSpec

Raises

DHError