consume_to_partitioned_table
The consume_to_partitioned_table
method reads a Kafka stream into an in-memory partitioned table.
Syntax
consume_to_partitioned_table(
kafka_config: dict,
topic: str,
partitions: list[int],
offsets: dict[int, int],
key_spec: KeyValueSpec,
value_spec: KeyValueSpec,
table_type: TableType,
) -> Table
Parameters
Parameter | Type | Description |
---|---|---|
kafka_config | dict | Configuration for the associated Kafka consumer and the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of |
topic | str | The Kafka topic name. |
partitions optional | list[int] |
|
offsets optional | dict[int, int] |
|
key_spec optional | KeyValueSpec | Specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of this module's functions:
This parameter can also be set to the predefined The default is |
value_spec optional | KeyValueSpec | Specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of this module's functions:
This parameter can also be set to the predefined The default is |
table_type optional | TableType | A
|
Returns
An in-memory partitioned table.
Examples
In the following example, consume_to_partitioned_table
reads the Kafka topic orders
into a partitioned table. It uses the JSON format to parse the stream. Since we do not provide partitions
or offsets
values, the consumer will include all partitions and start from the first message received after the consume_to_partitioned_table
call.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
from deephaven import dtypes as dht
pt = kc.consume_to_partitioned_table(
{"bootstrap.servers": "redpanda:9092"},
"orders",
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.json_spec(
[
("Symbol", dht.string),
("Side", dht.string),
("Price", dht.double),
("Qty", dht.int_),
("Tstamp", dht.Instant),
],
mapping={
"jsymbol": "Symbol",
"jside": "Side",
"jprice": "Price",
"jqty": "Qty",
"jts": "Tstamp",
},
),
table_type=TableType.append(),
)