deephaven.stream.kafka.consumer¶
The kafka.consumer module supports consuming a Kakfa topic as a Deephaven live table.
- ALL_PARTITIONS_DONT_SEEK = {-1: -2}¶
For all partitions, start consuming at the current position.
- ALL_PARTITIONS_SEEK_TO_BEGINNING = {-1: -1}¶
For all partitions, start consuming at the beginning.
- ALL_PARTITIONS_SEEK_TO_END = {-1: -3}¶
For all partitions, start consuming at the end.
- DONT_SEEK = -2¶
Start consuming at the current position of a partition.
- class KeyValueSpec(j_spec)[source]¶
Bases:
JObjectWrapper
- j_object_type¶
alias of
KafkaTools$Consume$KeyOrValueSpec
- class ProtobufProtocol(j_protocol)[source]¶
Bases:
JObjectWrapper
The protobuf serialization / deserialization protocol.
- j_object_type¶
alias of
Protocol
- static raw()[source]¶
The raw Protobuf protocol. The full payload is the normal binary encoding of the Protobuf data.
- Return type:
- SEEK_TO_BEGINNING = -1¶
Start consuming at the beginning of a partition.
- SEEK_TO_END = -3¶
Start consuming at the end of a partition.
- class TableType(j_table_type)[source]¶
Bases:
JObjectWrapper
A factory that creates the supported Table Type for consuming Kafka.
- static append()[source]¶
Consume all partitions into a single interleaved in-memory append-only table.
- static blink()[source]¶
Consume all partitions into a single interleaved blink table, which will present only newly-available rows to downstream operations and visualizations.
- j_object_type¶
alias of
KafkaTools$TableType
- avro_spec(schema, schema_version='latest', mapping=None, mapped_only=False)[source]¶
Creates a spec for how to use an Avro schema when consuming a Kafka stream to a Deephaven table.
- Parameters:
schema (str) – Either a JSON encoded Avro schema definition string, or the name for a schema registered in a Confluent compatible Schema Server. If the name for a schema in Schema Server, the associated ‘kafka_config’ parameter in the call to consume() should include the key ‘schema.registry.url’ with the value of the Schema Server URL for fetching the schema definition
schema_version (str) – the schema version to fetch from schema service, default is ‘latest’
mapping (Dict[str, str]) – a mapping from Avro field name to Deephaven table column name; the fields specified in the mapping will have their column names defined by it; if ‘mapped_only’ parameter is False, any other fields not mentioned in the mapping will use the same Avro field name for Deephaven table column; otherwise, these unmapped fields will be ignored and will not be present in the resulting table. default is None
mapped_only (bool) – whether to ignore Avro fields not present in the ‘mapping’ argument, default is False
- Return type:
- Returns:
a KeyValueSpec
- Raises:
DHError –
- consume(kafka_config, topic, partitions=None, offsets=None, key_spec=None, value_spec=None, table_type=deephaven.stream.kafka.consumer.TableType(io.deephaven.kafka.KafkaTools$TableType$Blink(objectRef=0x5565f30ddaca)))[source]¶
Consume from Kafka to a Deephaven table.
- Parameters:
kafka_config (Dict) – configuration for the associated Kafka consumer and also the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer specific desired configuration here
topic (str) – the Kafka topic name
partitions (List[int]) – a list of integer partition numbers, default is None which means all partitions
offsets (Dict[int, int]) – a mapping between partition numbers and offset numbers, and can be one of the predefined ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK. The default is None which works the same as ALL_PARTITIONS_DONT_SEEK. The offset numbers may be one of the predefined SEEK_TO_BEGINNING, SEEK_TO_END, or DONT_SEEK.
key_spec (KeyValueSpec) – specifies how to map the Key field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type
value_spec (KeyValueSpec) – specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.value.column.name’ and ‘deephaven.value.column.type’, for the single resulting column name and type
table_type (TableType) – a TableType, default is TableType.blink()
- Return type:
- Returns:
a Deephaven live table that will update based on Kafka messages consumed for the given topic
- Raises:
DHError –
- consume_to_partitioned_table(kafka_config, topic, partitions=None, offsets=None, key_spec=None, value_spec=None, table_type=deephaven.stream.kafka.consumer.TableType(io.deephaven.kafka.KafkaTools$TableType$Blink(objectRef=0x5565f30ddad2)))[source]¶
Consume from Kafka to a Deephaven partitioned table.
- Parameters:
kafka_config (Dict) – configuration for the associated Kafka consumer and also the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer specific desired configuration here
topic (str) – the Kafka topic name
partitions (List[int]) – a list of integer partition numbers, default is None which means all partitions
offsets (Dict[int, int]) – a mapping between partition numbers and offset numbers, and can be one of the predefined ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK. The default is None which works the same as ALL_PARTITIONS_DONT_SEEK. The offset numbers may be one of the predefined SEEK_TO_BEGINNING, SEEK_TO_END, or DONT_SEEK.
key_spec (KeyValueSpec) – specifies how to map the Key field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type
value_spec (KeyValueSpec) – specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.value.column.name’ and ‘deephaven.value.column.type’, for the single resulting column name and type
table_type (TableType) – a TableType, specifying the type of the expected result’s constituent tables, default is TableType.blink()
- Return type:
- Returns:
a Deephaven live partitioned table that will update based on Kafka messages consumed for the given topic, the keys of this partitioned table are the partition numbers of the topic, and its constituents are tables per topic partition.
- Raises:
DHError –
- json_spec(col_defs, mapping=None)[source]¶
Creates a spec for how to use JSON data when consuming a Kafka stream to a Deephaven table.
- Parameters:
col_defs (Union[TableDefinitionLike, List[Tuple[str, DType]]) – the table definition, preferably specified as TableDefinitionLike. A list of tuples with two elements, a string for column name and a Deephaven type for column data type also works, but is deprecated for removal.
mapping (Dict) – a dict mapping JSON fields to column names defined in the col_defs argument. Fields starting with a ‘/’ character are interpreted as a JSON Pointer (see RFC 6901, ISSN: 2070-1721 for details, essentially nested fields are represented like “/parent/nested”). Fields not starting with a ‘/’ character are interpreted as toplevel field names. If the mapping argument is not present or None, a 1:1 mapping between JSON fields and Deephaven table column names is assumed.
- Return type:
- Returns:
a KeyValueSpec
- Raises:
DHError –
- object_processor_spec(provider)[source]¶
Creates a kafka key-value spec implementation from a named object processor provider. It must be capable of supporting a byte array.
- protobuf_spec(schema=None, schema_version=None, schema_message_name=None, message_class=None, include=None, protocol=None)[source]¶
Creates a spec for parsing a Kafka protobuf stream into a Deephaven table. Uses the schema, schema_version, and schema_message_name to fetch the schema from the schema registry; or uses message_class to to get the schema from the classpath.
- Parameters:
schema (Optional[str]) – the schema subject name. When set, this will fetch the protobuf message descriptor from the schema registry. Either this, or message_class, must be set.
schema_version (Optional[int]) – the schema version, or None for latest, default is None. For purposes of reproducibility across restarts where schema changes may occur, it is advisable for callers to set this. This will ensure the resulting table definition will not change across restarts. This gives the caller an explicit opportunity to update any downstream consumers when updating schema_version if necessary.
schema_message_name (Optional[str]) – the fully-qualified protobuf message name, for example “com.example.MyMessage”. This message’s descriptor will be used as the basis for the resulting table’s definition. If None, the first message descriptor in the protobuf schema will be used. The default is None. It is advisable for callers to explicitly set this.
message_class (Optional[str]) – the fully-qualified Java class name for the protobuf message on the current classpath, for example “com.example.MyMessage” or “com.example.OuterClass$MyMessage”. When this is set, the schema registry will not be used. Either this, or schema, must be set.
include (Optional[List[str]]) – the ‘/’ separated paths to include. The final path may be a ‘*’ to additionally match everything that starts with path. For example, include=[“/foo/bar”] will include the field path name paths [], [“foo”], and [“foo”, “bar”]. include=[“/foo/bar/*”] will additionally include any field path name paths that start with [“foo”, “bar”]: [“foo”, “bar”, “baz”], [“foo”, “bar”, “baz”, “zap”], etc. When multiple includes are specified, the fields will be included when any of the components matches. Default is None, which includes all paths.
protocol (Optional[ProtobufProtocol]) – the wire protocol for this payload. When schema is set, ProtobufProtocol.serdes() will be used by default. When message_class is set, ProtobufProtocol.raw() will be used by default.
- Return type:
- Returns:
a KeyValueSpec
- simple_spec(col_name, data_type=None)[source]¶
Creates a spec that defines a single column to receive the key or value of a Kafka message when consuming a Kafka stream to a Deephaven table.