consume
consume
reads a Kafka stream into an in-memory table.
Syntax
consume(
kafka_config: dict,
topic: str,
partitions: list[int],
offsets: dict[int, int],
key_spec: KeyValueSpec,
value_spec: KeyValueSpec,
table_type: TableType,
) -> Table
Parameters
Parameter | Type | Description |
---|---|---|
kafka_config | dict | Configuration for the associated Kafka consumer and the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of |
topic | str | The Kafka topic name. |
partitions optional | list[int] | An int list of partition numbers to subscribe to. The default is to subscribe to all partitions. Specify partitions as a comma-separated list; e.g., |
offsets optional | dict[int, int] |
|
key_spec optional | KeyValueSpec | Specifies how to map the Key field in Kafka messages to table column(s). Any of the following found in
|
value_spec optional | KeyValueSpec | Specifies how to map the Value field in Kafka messages to table column(s). Any of the following found in
|
table_type optional | TableType | One of the following
|
Returns
An in-memory table.
Examples
In the following example, consume
is used to read the Kafka topic testTopic
into a Deephaven table. Only the first
two positional arguments are required to read a Kafka stream into a Deephaven table.
- The first positional argument,
{'bootstrap.servers': 'redpanda:9092'}
, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.- Here, the host and port for the Kafka server to use to bootstrap the subscription are specified. This is the minimal amount of required information.
- The value
redpanda:9092
corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
- The second positional argument is the name of the topic (
testTopic
).
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
result = kc.consume(
{
"bootstrap.servers": "redpanda:9092",
"deephaven.key.column.type": "String",
"deephaven.value.column.type": "String",
},
"testTopic",
)
In the following example, consume
is used to read Kafka topic share_price
with additional settings enabled and a
specific key_spec
spec and value_spec
defined.
partitions
is set toALL_PARTITIONS
, which listens to all partitions.offsets
is set toALL_PARTITIONS_DONT_SEEK
, which only listens to new messages produced after this call is processed.key_spec
is set tosimple_spec('Symbol', dht.string)
, which expects messages with a Kafka key field of typestring
, and creates aSymbol
column to store the information.value_spec
is set tosimple_spec('Price', dht.double)
, which expects messages with a Kafka value field of typedouble
, and creates aPrice
column to store the information.table_type
is set toTableType.append()
, which creates an append-only table.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht
result = kc.consume(
{"bootstrap.servers": "redpanda:9092"},
"share_price",
partitions=kc.ALL_PARTITIONS,
offsets=kc.ALL_PARTITIONS_DONT_SEEK,
key_spec=kc.simple_spec("Symbol", dht.string),
value_spec=kc.simple_spec("Price", dht.double),
table_type=TableType.append(),
)
In the following example, consume
reads the Kafka topic share_price
with an additional dictionary set and keys
ignored.
deephaven.partition.column.name
is set toNone
. The result table will not include a column for the partition field.key_spec
is set toIGNORE
. The result table will not include a column associated with the Kafka fieldkey
.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht
result = kc.consume(
{"bootstrap.servers": "redpanda:9092", "deephaven.partition.column.name": None},
"share_price",
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.simple_spec("Price", dht.double),
)
In the following example, consume
reads the Kafka topic share_price
in JSON
format.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht
result = kc.consume(
{"bootstrap.servers": "redpanda:9092"},
"orders",
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.json_spec(
{
"Symbol": dht.string,
"Side": dht.string,
"Price": dht.double,
"Qty": dht.int64,
},
mapping={
"jsymbol": "Symbol",
"jside": "Side",
"jprice": "Price",
"jqty": "Qty",
},
),
table_type=TableType.append(),
)
The following example also reads the Kafka topic orders
in JSON
format, but this time, uses a Jackson provider object processor specification.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import (
TableType,
KeyValueSpec,
object_processor_spec,
)
from deephaven.json import jackson
result = kc.consume(
{"bootstrap.servers": "redpanda:9092"},
"orders",
key_spec=KeyValueSpec.IGNORE,
value_spec=object_processor_spec(
jackson.provider(
{
"jsymbol": str,
"jside": str,
"jprice": float,
"jqty": int,
}
)
),
table_type=TableType.append(),
).rename_columns(["Symbol=jsymbol", "Side=jside", "Price=jprice", "Qty=jqty"])
In the following example, consume
reads the Kafka topic share_price
in Avro
format.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
result = kc.consume(
{
"bootstrap.servers": "redpanda:9092",
"schema.registry.url": "http://redpanda:8081",
},
"share_price",
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.avro_spec("share_price_record", schema_version="1"),
table_type=TableType.Append,
)