consume
consume
reads a Kafka stream into an in-memory table.
Syntax
consume(kafkaConsumerProperties, topic)
consume(kafkaConsumerProperties, topic,
partitions, offsets,
key_spec=simple(columnName, columnType, ...),
value_spec=simple(columnName, columnType, ...),
table_type)
Parameters
Parameter | Type | Description |
---|---|---|
kafkaConsumerProperties | String | Provides the initial hosts that act as the starting point for a Kafka client to discover the full set of alive servers
in the cluster; e.g., |
topic | String | Name of the topic to subscribe to. |
partitions optional | Array |
|
offsets optional | Dictionary |
|
key_spec optional | Internal Type | This argument should be provided from the result of either calling one of the spec generation methods (
|
value_spec optional | Method | This argument should be provided from the result of either calling one of the spec generation methods (
|
table_type optional | TableType |
|
Returns
A new in-memory table from a Kafka stream.
Examples
In the following example, consume
is used to read the Kafka topic testTopic
into a Deephaven table. Only the first
two positional arguments are required to read a Kafka stream into a Deephaven table.
- The first positional argument,
{'bootstrap.servers': 'redpanda:29092'}
, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.- Here, the host and port for the Kafka server to use to bootstrap the subscription are specified. This is the minimal amount of required information.
- The value
redpanda:29092
corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
- The second positional argument is the name of the topic (
testTopic
).
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
result = kc.consume({'bootstrap.servers': 'redpanda:29092'}, 'testTopic')
- result
In the following example, consume
is used to read Kafka topic share_price
with additional settings enabled and a
specific key_spec
_spec and value_spec
defined.
partitions
is set toALL_PARTITIONS
, which listens to all partitions.offsets
is set toALL_PARTITIONS_DONT_SEEK
, which only listens to new messages produced after this call is processed.key_spec
is set tosimple_spec('Symbol', dht.string)
, which expects messages with a Kafka key field of typestring
, and creates aSymbol
column to store the information.value_spec
is set tosimple_spec('Price', dht.double)
, which expects messages with a Kafka value field of typedouble
, and creates aPrice
column to store the information.table_type
is set toTableType.append()
, which creates an append-only table.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht
result = kc.consume({'bootstrap.servers' : 'redpanda:29092'},
'share_price',
partitions=kc.ALL_PARTITIONS,
offsets=kc.ALL_PARTITIONS_DONT_SEEK,
key_spec=kc.simple_spec('Symbol', dht.string),
value_spec=kc.simple_spec('Price', dht.double),
table_type=TableType.append())
In the following example, consume
reads the Kafka topic share_price
with an additional dictionary set and keys
ignored.
deephaven.partition.column.name
is set toNone
. The result table will not include a column for the partition field.key_spec
is set toIGNORE
. The result table will not include a column associated with the Kafka fieldkey
.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht
result = kc.consume({ 'bootstrap.servers' : 'redpanda:29092',
'deephaven.partition.column.name' : None },
'share_price',
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.simple_spec('Price', dht.double))
In the following example, consume
reads the Kafka topic share_price
in JSON
format.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht
result = kc.consume({ 'bootstrap.servers' : 'redpanda:29092' },
'orders',
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.json_spec([('Symbol', dht.string),
('Side', dht.string),
('Price', dht.double),
('Qty', dht.int_) ],
mapping={ 'jsymbol' : 'Symbol',
'jside' : 'Side',
'jprice' : 'Price',
'jqty' : 'Qty' }),
table_type=TableType.append())
In the following example, consume
reads the Kafka topic share_price
in Avro
format.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
result = kc.consume({ 'bootstrap.servers' : 'redpanda:29092',
'schema.registry.url' :
'http://redpanda:8081' },
'share_price',
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.avro_spec('share_price_record', schema_version='1'),
table_type=TableType.Append)