deephaven.stream.kafka.consumer

The kafka.consumer module supports consuming a Kakfa topic as a Deephaven live table.

ALL_PARTITIONS_DONT_SEEK = {-1: -2}

For all partitions, start consuming at the current position.

ALL_PARTITIONS_SEEK_TO_BEGINNING = {-1: -1}

For all partitions, start consuming at the beginning.

ALL_PARTITIONS_SEEK_TO_END = {-1: -3}

For all partitions, start consuming at the end.

DONT_SEEK = -2

Start consuming at the current position of a partition.

class KeyValueSpec(j_spec)[source]

Bases: JObjectWrapper

j_object_type

alias of KafkaTools$Consume$KeyOrValueSpec

class ProtobufProtocol(j_protocol)[source]

Bases: JObjectWrapper

The protobuf serialization / deserialization protocol.

j_object_type

alias of Protocol

static raw()[source]

The raw Protobuf protocol. The full payload is the normal binary encoding of the Protobuf data.

Return type:

ProtobufProtocol

static serdes()[source]

The Kafka Protobuf serdes protocol. The payload’s first byte is the serdes magic byte, the next 4-bytes are the schema ID, the next variable-sized bytes are the message indexes, followed by the normal binary encoding of the Protobuf data.

Return type:

ProtobufProtocol

SEEK_TO_BEGINNING = -1

Start consuming at the beginning of a partition.

SEEK_TO_END = -3

Start consuming at the end of a partition.

class TableType(j_table_type)[source]

Bases: JObjectWrapper

A factory that creates the supported Table Type for consuming Kafka.

static append()[source]

Consume all partitions into a single interleaved in-memory append-only table.

Consume all partitions into a single interleaved blink table, which will present only newly-available rows to downstream operations and visualizations.

j_object_type

alias of KafkaTools$TableType

static ring(capacity)[source]

Consume all partitions into a single in-memory ring table.

static stream()[source]

Deprecated synonym for “blink”.

avro_spec(schema, schema_version='latest', mapping=None, mapped_only=False)[source]

Creates a spec for how to use an Avro schema when consuming a Kafka stream to a Deephaven table.

Parameters:
  • schema (str) – Either a JSON encoded Avro schema definition string, or the name for a schema registered in a Confluent compatible Schema Server. If the name for a schema in Schema Server, the associated ‘kafka_config’ parameter in the call to consume() should include the key ‘schema.registry.url’ with the value of the Schema Server URL for fetching the schema definition

  • schema_version (str) – the schema version to fetch from schema service, default is ‘latest’

  • mapping (Dict[str, str]) – a mapping from Avro field name to Deephaven table column name; the fields specified in the mapping will have their column names defined by it; if ‘mapped_only’ parameter is False, any other fields not mentioned in the mapping will use the same Avro field name for Deephaven table column; otherwise, these unmapped fields will be ignored and will not be present in the resulting table. default is None

  • mapped_only (bool) – whether to ignore Avro fields not present in the ‘mapping’ argument, default is False

Return type:

KeyValueSpec

Returns:

a KeyValueSpec

Raises:

DHError

consume(kafka_config, topic, partitions=None, offsets=None, key_spec=None, value_spec=None, table_type=deephaven.stream.kafka.consumer.TableType(io.deephaven.kafka.KafkaTools$TableType$Blink(objectRef=0x563e79b304ca)))[source]

Consume from Kafka to a Deephaven table.

Parameters:
  • kafka_config (Dict) – configuration for the associated Kafka consumer and also the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer specific desired configuration here

  • topic (str) – the Kafka topic name

  • partitions (List[int]) – a list of integer partition numbers, default is None which means all partitions

  • offsets (Dict[int, int]) – a mapping between partition numbers and offset numbers, and can be one of the predefined ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK. The default is None which works the same as ALL_PARTITIONS_DONT_SEEK. The offset numbers may be one of the predefined SEEK_TO_BEGINNING, SEEK_TO_END, or DONT_SEEK.

  • key_spec (KeyValueSpec) – specifies how to map the Key field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type

  • value_spec (KeyValueSpec) – specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.value.column.name’ and ‘deephaven.value.column.type’, for the single resulting column name and type

  • table_type (TableType) – a TableType, default is TableType.blink()

Return type:

Table

Returns:

a Deephaven live table that will update based on Kafka messages consumed for the given topic

Raises:

DHError

consume_to_partitioned_table(kafka_config, topic, partitions=None, offsets=None, key_spec=None, value_spec=None, table_type=deephaven.stream.kafka.consumer.TableType(io.deephaven.kafka.KafkaTools$TableType$Blink(objectRef=0x563e79b304d2)))[source]

Consume from Kafka to a Deephaven partitioned table.

Parameters:
  • kafka_config (Dict) – configuration for the associated Kafka consumer and also the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer specific desired configuration here

  • topic (str) – the Kafka topic name

  • partitions (List[int]) – a list of integer partition numbers, default is None which means all partitions

  • offsets (Dict[int, int]) – a mapping between partition numbers and offset numbers, and can be one of the predefined ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK. The default is None which works the same as ALL_PARTITIONS_DONT_SEEK. The offset numbers may be one of the predefined SEEK_TO_BEGINNING, SEEK_TO_END, or DONT_SEEK.

  • key_spec (KeyValueSpec) – specifies how to map the Key field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type

  • value_spec (KeyValueSpec) – specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.value.column.name’ and ‘deephaven.value.column.type’, for the single resulting column name and type

  • table_type (TableType) – a TableType, specifying the type of the expected result’s constituent tables, default is TableType.blink()

Return type:

PartitionedTable

Returns:

a Deephaven live partitioned table that will update based on Kafka messages consumed for the given topic, the keys of this partitioned table are the partition numbers of the topic, and its constituents are tables per topic partition.

Raises:

DHError

json_spec(col_defs, mapping=None)[source]

Creates a spec for how to use JSON data when consuming a Kafka stream to a Deephaven table.

Parameters:
  • col_defs (Union[TableDefinitionLike, List[Tuple[str, DType]]) – the table definition, preferably specified as TableDefinitionLike. A list of tuples with two elements, a string for column name and a Deephaven type for column data type also works, but is deprecated for removal.

  • mapping (Dict) – a dict mapping JSON fields to column names defined in the col_defs argument. Fields starting with a ‘/’ character are interpreted as a JSON Pointer (see RFC 6901, ISSN: 2070-1721 for details, essentially nested fields are represented like “/parent/nested”). Fields not starting with a ‘/’ character are interpreted as toplevel field names. If the mapping argument is not present or None, a 1:1 mapping between JSON fields and Deephaven table column names is assumed.

Return type:

KeyValueSpec

Returns:

a KeyValueSpec

Raises:

DHError

object_processor_spec(provider)[source]

Creates a kafka key-value spec implementation from a named object processor provider. It must be capable of supporting a byte array.

Parameters:

provider (jpy.JType) – the named object processor provider

Return type:

KeyValueSpec

Returns:

a KeyValueSpec

Raises:

DHError

protobuf_spec(schema=None, schema_version=None, schema_message_name=None, message_class=None, include=None, protocol=None)[source]

Creates a spec for parsing a Kafka protobuf stream into a Deephaven table. Uses the schema, schema_version, and schema_message_name to fetch the schema from the schema registry; or uses message_class to to get the schema from the classpath.

Parameters:
  • schema (Optional[str]) – the schema subject name. When set, this will fetch the protobuf message descriptor from the schema registry. Either this, or message_class, must be set.

  • schema_version (Optional[int]) – the schema version, or None for latest, default is None. For purposes of reproducibility across restarts where schema changes may occur, it is advisable for callers to set this. This will ensure the resulting table definition will not change across restarts. This gives the caller an explicit opportunity to update any downstream consumers when updating schema_version if necessary.

  • schema_message_name (Optional[str]) – the fully-qualified protobuf message name, for example “com.example.MyMessage”. This message’s descriptor will be used as the basis for the resulting table’s definition. If None, the first message descriptor in the protobuf schema will be used. The default is None. It is advisable for callers to explicitly set this.

  • message_class (Optional[str]) – the fully-qualified Java class name for the protobuf message on the current classpath, for example “com.example.MyMessage” or “com.example.OuterClass$MyMessage”. When this is set, the schema registry will not be used. Either this, or schema, must be set.

  • include (Optional[List[str]]) – the ‘/’ separated paths to include. The final path may be a ‘*’ to additionally match everything that starts with path. For example, include=[“/foo/bar”] will include the field path name paths [], [“foo”], and [“foo”, “bar”]. include=[“/foo/bar/*”] will additionally include any field path name paths that start with [“foo”, “bar”]: [“foo”, “bar”, “baz”], [“foo”, “bar”, “baz”, “zap”], etc. When multiple includes are specified, the fields will be included when any of the components matches. Default is None, which includes all paths.

  • protocol (Optional[ProtobufProtocol]) – the wire protocol for this payload. When schema is set, ProtobufProtocol.serdes() will be used by default. When message_class is set, ProtobufProtocol.raw() will be used by default.

Return type:

KeyValueSpec

Returns:

a KeyValueSpec

simple_spec(col_name, data_type=None)[source]

Creates a spec that defines a single column to receive the key or value of a Kafka message when consuming a Kafka stream to a Deephaven table.

Parameters:
  • col_name (str) – the Deephaven column name

  • data_type (DType) – the column data type

Return type:

KeyValueSpec

Returns:

a KeyValueSpec

Raises:

DHError