deephaven.stream.kafka.consumer¶
The kafka.consumer module supports consuming a Kakfa topic as a Deephaven live table.
- ALL_PARTITIONS_DONT_SEEK = {-1: -2}¶
For all partitions, start consuming at the current position.
- ALL_PARTITIONS_SEEK_TO_BEGINNING = {-1: -1}¶
For all partitions, start consuming at the beginning.
- ALL_PARTITIONS_SEEK_TO_END = {-1: -3}¶
For all partitions, start consuming at the end.
- DONT_SEEK = -2¶
Start consuming at the current position of a partition.
- class KeyValueSpec(j_spec)[source]¶
Bases:
deephaven._wrapper.JObjectWrapper
- j_object_type¶
alias of
io.deephaven.kafka.KafkaTools$Consume$KeyOrValueSpec
- SEEK_TO_BEGINNING = -1¶
Start consuming at the beginning of a partition.
- SEEK_TO_END = -3¶
Start consuming at the end of a partition.
- class TableType(j_table_type)[source]¶
Bases:
deephaven._wrapper.JObjectWrapper
An Enum that defines the supported Table Type for consuming Kafka.
- static append()[source]¶
Consume all partitions into a single interleaved in-memory append-only table.
- j_object_type¶
alias of
io.deephaven.kafka.KafkaTools$TableType
- avro_spec(schema, schema_version='latest', mapping=None, mapped_only=False)[source]¶
Creates a spec for how to use an Avro schema when consuming a Kafka stream to a Deephaven table.
- Parameters
schema (str) –
Either a JSON encoded Avro schema definition string, or the name for a schema registered in a Confluent compatible Schema Server.
If the name for a schema in Schema Server, the associated
’kafka_config’ parameter in the call to consume() should include the key ‘schema.registry.url’ with the value of the Schema Server URL for fetching the schema definition
schema_version (str) – the schema version to fetch from schema service, default is ‘latest’
mapping (Dict[str, str]) – a mapping from Avro field name to Deephaven table column name; the fields specified in the mapping will have their column names defined by it; if ‘mapped_only’ parameter is False, any other fields not mentioned in the mapping will use the same Avro field name for Deephaven table column; otherwise, these unmapped fields will be ignored and will not be present in the resulting table. default is None
mapped_only (bool) – whether to ignore Avro fields not present in the ‘mapping’ argument, default is False
- Return type
- Returns
a KeyValueSpec
- Raises
DHError –
- consume(kafka_config, topic, partitions=None, offsets=None, key_spec=None, value_spec=None, table_type=deephaven.stream.kafka.consumer.TableType(io.deephaven.kafka.KafkaTools$TableType$Stream(objectRef=0x31861f0)))[source]¶
Consume from Kafka to a Deephaven table.
- Parameters
kafka_config (Dict) – configuration for the associated Kafka consumer and also the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer specific desired configuration here
topic (str) – the Kafka topic name
partitions (List[int]) – a list of integer partition numbers, default is None which means all partitions
offsets (Dict[int, int]) – a mapping between partition numbers and offset numbers, and can be one of the predefined ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK. The default is None which works the same as ALL_PARTITIONS_DONT_SEEK. The offset numbers may be one of the predefined SEEK_TO_BEGINNING, SEEK_TO_END, or DONT_SEEK.
key_spec (KeyValueSpec) – specifies how to map the Key field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type
value_spec (KeyValueSpec) – specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type
table_type (TableType) – a TableType enum, default is TableType.stream()
- Return type
- Returns
a Deephaven live table that will update based on Kafka messages consumed for the given topic
- Raises
DHError –
- json_spec(col_defs, mapping=None)[source]¶
Creates a spec for how to use JSON data when consuming a Kafka stream to a Deephaven table.
- Parameters
col_defs (List[Tuple[str, DType]]) – a list of tuples specifying names and types for columns to be created on the resulting Deephaven table. Tuples contain two elements, a string for column name and a Deephaven type for column data type.
mapping (Dict) –
- a dict mapping JSON fields to column names defined in the col_defs
argument. Fields starting with a ‘/’ character are interpreted as a JSON Pointer (see RFC 6901, ISSN: 2070-1721 for details, essentially nested fields are represented like “/parent/nested”). Fields not starting with a ‘/’ character are interpreted as toplevel field names. If the mapping argument is not present or None, a 1:1 mapping between JSON fields and Deephaven
table column names is assumed.
- Return type
- Returns
a KeyValueSpec
- Raises
DHError –
- simple_spec(col_name, data_type=None)[source]¶
Creates a spec that defines a single column to receive the key or value of a Kafka message when consuming a Kafka stream to a Deephaven table.