Skip to main content
Version: Python

consume_to_partitioned_table

The consume_to_partitioned_table method reads a Kafka stream into an in-memory partitioned table.

Syntax

consume_to_partitioned_table(
kafka_config: dict,
topic: str,
partitions: list[int],
offsets: dict[int, int],
key_spec: KeyValueSpec,
value_spec: KeyValueSpec,
table_type: TableType,
) -> Table

Parameters

ParameterTypeDescription
kafka_configdict

Configuration for the associated Kafka consumer and the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer-specific desired configuration here.

topicstr

The Kafka topic name.

partitions optionallist[int]
  • Subscribe to all partitions by default.
  • Specify partitions as a comma-separated list; e.g.,partitions=[1, 3, 5 ] to only listen to partitions 1, 3, and 5.
offsets optionaldict[int, int]
  • ALL_PARTITIONS_DONT_SEEK - Only receive new messages produced after this call is processed (default). This matches the default if offsets is not specified.
  • ALL_PARTITIONS_SEEK_TO_END - Go to the newest available message for every partition.
  • ALL_PARTITIONS_SEEK_TO_BEGINNING - Go to the oldest available message for every partition.
  • { _number_ : _offset_, ... } - Map partition numbers to numeric offsets or one of the constants DONT_SEEK, SEEK_TO_END, SEEK_TO_BEGINNING.
    • For example, { 1 : SEEK_TO_END, 3 : SEEK_TO_BEGINNING, 5 : 27 } means:
      • Start from the last message sent in partition 1.
      • Go to the oldest available message in partition 3.
      • Start with offset 27 in partition 5.
key_spec optionalKeyValueSpec

Specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of this module's functions:

  • simple_spec()
  • avro_spec()
  • json_spec()
  • protobuf_spec()

This parameter can also be set to the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES.

The default is None. The default works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys deephaven.key.column.name and deephaven.key.column.type for the single resulting column name and type.

value_spec optionalKeyValueSpec

Specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of this module's functions:

  • simple_spec()
  • avro_spec()
  • json_spec()
  • protobuf_spec()

This parameter can also be set to the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES.

The default is None. The default works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys deephaven.key.column.name and deephaven.key.column.type for the single resulting column name and type.

table_type optionalTableType

A TableType enum. The default is TableType.blink().

  • TableType.append() - Create an append-only table.
  • TableType.blink() - Create a blink table (default).
  • TableType.ring(N) - Create a ring table of size N.

Returns

An in-memory partitioned table.

Examples

In the following example, consume_to_partitioned_table reads the Kafka topic orders into a partitioned table. It uses the JSON format to parse the stream. Since we do not provide partitions or offsets values, the consumer will include all partitions and start from the first message received after the consume_to_partitioned_table call.

from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
from deephaven import dtypes as dht

pt = kc.consume_to_partitioned_table(
{"bootstrap.servers": "redpanda:9092"},
"orders",
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.json_spec(
[
("Symbol", dht.string),
("Side", dht.string),
("Price", dht.double),
("Qty", dht.int_),
("Tstamp", dht.Instant),
],
mapping={
"jsymbol": "Symbol",
"jside": "Side",
"jprice": "Price",
"jqty": "Qty",
"jts": "Tstamp",
},
),
table_type=TableType.append(),
)