deephaven.stream.kafka.consumer

The kafka.consumer module supports consuming a Kakfa topic as a Deephaven live table.

ALL_PARTITIONS_DONT_SEEK = {-1: -2}

For all partitions, start consuming at the current position.

ALL_PARTITIONS_SEEK_TO_BEGINNING = {-1: -1}

For all partitions, start consuming at the beginning.

ALL_PARTITIONS_SEEK_TO_END = {-1: -3}

For all partitions, start consuming at the end.

DONT_SEEK = -2

Start consuming at the current position of a partition.

class KeyValueSpec(j_spec)[source]

Bases: deephaven._wrapper.JObjectWrapper

j_object_type

alias of io.deephaven.kafka.KafkaTools$Consume$KeyOrValueSpec

SEEK_TO_BEGINNING = -1

Start consuming at the beginning of a partition.

SEEK_TO_END = -3

Start consuming at the end of a partition.

class TableType(j_table_type)[source]

Bases: deephaven._wrapper.JObjectWrapper

An Enum that defines the supported Table Type for consuming Kafka.

static append()[source]

Consume all partitions into a single interleaved in-memory append-only table.

j_object_type

alias of io.deephaven.kafka.KafkaTools$TableType

static ring(capacity)[source]

Consume all partitions into a single in-memory ring table.

static stream()[source]

Consume all partitions into a single interleaved stream table, which will present only newly-available rows to downstream operations and visualizations.

avro_spec(schema, schema_version='latest', mapping=None, mapped_only=False)[source]

Creates a spec for how to use an Avro schema when consuming a Kafka stream to a Deephaven table.

Parameters
  • schema (str) –

    Either a JSON encoded Avro schema definition string, or the name for a schema registered in a Confluent compatible Schema Server.

    If the name for a schema in Schema Server, the associated

    ’kafka_config’ parameter in the call to consume() should include the key ‘schema.registry.url’ with the value of the Schema Server URL for fetching the schema definition

  • schema_version (str) – the schema version to fetch from schema service, default is ‘latest’

  • mapping (Dict[str, str]) – a mapping from Avro field name to Deephaven table column name; the fields specified in the mapping will have their column names defined by it; if ‘mapped_only’ parameter is False, any other fields not mentioned in the mapping will use the same Avro field name for Deephaven table column; otherwise, these unmapped fields will be ignored and will not be present in the resulting table. default is None

  • mapped_only (bool) – whether to ignore Avro fields not present in the ‘mapping’ argument, default is False

Return type

KeyValueSpec

Returns

a KeyValueSpec

Raises

DHError

consume(kafka_config, topic, partitions=None, offsets=None, key_spec=None, value_spec=None, table_type=deephaven.stream.kafka.consumer.TableType(io.deephaven.kafka.KafkaTools$TableType$Stream(objectRef=0x31861f0)))[source]

Consume from Kafka to a Deephaven table.

Parameters
  • kafka_config (Dict) – configuration for the associated Kafka consumer and also the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer specific desired configuration here

  • topic (str) – the Kafka topic name

  • partitions (List[int]) – a list of integer partition numbers, default is None which means all partitions

  • offsets (Dict[int, int]) – a mapping between partition numbers and offset numbers, and can be one of the predefined ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK. The default is None which works the same as ALL_PARTITIONS_DONT_SEEK. The offset numbers may be one of the predefined SEEK_TO_BEGINNING, SEEK_TO_END, or DONT_SEEK.

  • key_spec (KeyValueSpec) – specifies how to map the Key field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type

  • value_spec (KeyValueSpec) – specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type

  • table_type (TableType) – a TableType enum, default is TableType.stream()

Return type

Table

Returns

a Deephaven live table that will update based on Kafka messages consumed for the given topic

Raises

DHError

json_spec(col_defs, mapping=None)[source]

Creates a spec for how to use JSON data when consuming a Kafka stream to a Deephaven table.

Parameters
  • col_defs (List[Tuple[str, DType]]) – a list of tuples specifying names and types for columns to be created on the resulting Deephaven table. Tuples contain two elements, a string for column name and a Deephaven type for column data type.

  • mapping (Dict) –

    a dict mapping JSON fields to column names defined in the col_defs

    argument. Fields starting with a ‘/’ character are interpreted as a JSON Pointer (see RFC 6901, ISSN: 2070-1721 for details, essentially nested fields are represented like “/parent/nested”). Fields not starting with a ‘/’ character are interpreted as toplevel field names. If the mapping argument is not present or None, a 1:1 mapping between JSON fields and Deephaven

    table column names is assumed.

Return type

KeyValueSpec

Returns

a KeyValueSpec

Raises

DHError

simple_spec(col_name, data_type=None)[source]

Creates a spec that defines a single column to receive the key or value of a Kafka message when consuming a Kafka stream to a Deephaven table.

Parameters
  • col_name (str) – the Deephaven column name

  • data_type (DType) – the column data type

Return type

KeyValueSpec

Returns

a KeyValueSpec

Raises

DHError