consume
consume reads a Kafka stream into an in-memory table.
Syntax
Parameters
| Parameter | Type | Description |
|---|---|---|
| kafka_config | dict | Configuration for the associated Kafka consumer and the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of |
| topic | str | The Kafka topic name. |
| partitions optional | list[int] | An int list of partition numbers to subscribe to. The default is to subscribe to all partitions. Specify partitions as a comma-separated list; e.g., |
| offsets optional | dict[int, int] |
|
| key_spec optional | KeyValueSpec | Specifies how to map the Key field in Kafka messages to table column(s). Any of the following found in
|
| value_spec optional | KeyValueSpec | Specifies how to map the Value field in Kafka messages to table column(s). Any of the following found in
|
| table_type optional | TableType | One of the following
|
Returns
An in-memory table.
Examples
In the following example, consume is used to read the Kafka topic testTopic into a Deephaven table. Only the first
two positional arguments are required to read a Kafka stream into a Deephaven table.
- The first positional argument,
{'bootstrap.servers': 'redpanda:9092'}, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.- Here, the host and port for the Kafka server to use to bootstrap the subscription are specified. This is the minimal amount of required information.
- The value
redpanda:9092corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
- The second positional argument is the name of the topic (
testTopic).

In the following example, consume is used to read Kafka topic share_price with additional settings enabled and a
specific key_spec spec and value_spec defined.
partitionsis set toALL_PARTITIONS, which listens to all partitions.offsetsis set toALL_PARTITIONS_DONT_SEEK, which only listens to new messages produced after this call is processed.key_specis set tosimple_spec('Symbol', dht.string), which expects messages with a Kafka key field of typestring, and creates aSymbolcolumn to store the information.value_specis set tosimple_spec('Price', dht.double), which expects messages with a Kafka value field of typedouble, and creates aPricecolumn to store the information.table_typeis set toTableType.append(), which creates an append-only table.

In the following example, consume reads the Kafka topic share_price with an additional dictionary set and keys
ignored.
deephaven.partition.column.nameis set toNone. The result table will not include a column for the partition field.key_specis set toIGNORE. The result table will not include a column associated with the Kafka fieldkey.

In the following example, consume reads the Kafka topic share_price in JSON format.

The following example also reads the Kafka topic orders in JSON format, but this time, uses a Jackson provider object processor specification.
In the following example, consume reads the Kafka topic share_price in Avro format.
