consume_to_partitioned_table
The consume_to_partitioned_table
method reads a Kafka stream into an in-memory partitioned table.
Syntax
consume_to_partitioned_table(
kafka_config: dict,
topic: str,
partitions: list[int],
offsets: dict[int, int],
key_spec: KeyValueSpec,
value_spec: KeyValueSpec,
table_type: TableType,
) -> Table
Parameters
Parameter | Type | Description |
---|---|---|
kafka_config | dict | Configuration for the associated Kafka consumer and the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of |
topic | str | The Kafka topic name. |
partitions optional | list[int] |
|
offsets optional | dict[int, int] |
|
key_spec optional | KeyValueSpec | Specifies how to map the Key field in Kafka messages to table column(s). Any of the following found in
|
value_spec optional | KeyValueSpec | Specifies how to map the Value field in Kafka messages to table column(s). Any of the following specifications found in
|
table_type optional | TableType | One of the following
|
Returns
An in-memory partitioned table.
Examples
In the following example, consume_to_partitioned_table
reads the Kafka topic orders
into a partitioned table. It uses the JSON format to parse the stream. Since we do not provide partitions
or offsets
values, the consumer will include all partitions and start from the first message received after the consume_to_partitioned_table
call.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
from deephaven import dtypes as dht
pt = kc.consume_to_partitioned_table(
{"bootstrap.servers": "redpanda:9092"},
"orders",
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.json_spec(
{
"Symbol": dht.string,
"Side": dht.string,
"Price": dht.double,
"Qty": dht.int64,
"Tstamp": dht.Instant,
},
mapping={
"jsymbol": "Symbol",
"jside": "Side",
"jprice": "Price",
"jqty": "Qty",
"jts": "Tstamp",
},
),
table_type=TableType.append(),
)
The following example, like the one above, reads the JSON-formatted Kafka topic orders
into a Partitioned table. This time, though, it uses a Jackson provider object processor specification.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import (
TableType,
KeyValueSpec,
object_processor_spec,
)
from deephaven import dtypes as dht
from deephaven.json import jackson
from datetime import datetime
pt = (
kc.consume_to_partitioned_table(
{"bootstrap.servers": "redpanda:9092"},
"orders",
key_spec=KeyValueSpec.IGNORE,
value_spec=object_processor_spec(
jackson.provider(
{
"jsymbol": str,
"jside": str,
"jprice": float,
"jqty": int,
"jts": datetime,
}
)
),
table_type=TableType.append(),
)
.proxy()
.view(["Symbol=jsymbol", "Side=jside", "Price=jprice", "Qty=jqty", "Tstamp=jts"])
.target
)