Skip to main content
Version: Python

consume

consume reads a Kafka stream into an in-memory table.

Syntax

consume(kafkaConsumerProperties, topic)

consume(kafkaConsumerProperties, topic,
partitions, offsets,
key_spec=simple(columnName, columnType, ...),
value_spec=simple(columnName, columnType, ...),
table_type)

Parameters

ParameterTypeDescription
kafkaConsumerPropertiesString

Provides the initial hosts that act as the starting point for a Kafka client to discover the full set of alive servers in the cluster; e.g., bootstrap.servers.

topicString

Name of the topic to subscribe to.

partitions optionalArray
  • Subscribe to all partitions by default.
  • Specify partitions as a comma-separated list; e.g.,partitions=[1, 3, 5 ] to only listen to partitions 1, 3, and 5.
offsets optionalDictionary
  • ALL_PARTITIONS_DONT_SEEK - Only recieve new messages produced after this call is processed (default). This matches the default if offsets is not specified.
  • ALL_PARTITIONS_SEEK_TO_END - Go to the newest available message for every partition.
  • ALL_PARTITIONS_SEEK_TO_BEGINNING - Go to the oldest available message for every partition.
  • { _number_ : _offset_, ... } Map partition numbers to numeric offsets or one of the constants DONT_SEEK, SEEK_TO_END, SEEK_TO_BEGINNING
    • For example, { 1 : SEEK_TO_END, 3 : SEEK_TO_BEGINNING, 5 : 27 } means:
      • Start from the last message sent in partition 1.
      • Go to the oldest available message in partition 3.
      • Start with offset 27 in partition 5.
key optionalInternal Type

This argument should be provided from the result of either calling one of the methods avro, json, or simple, or the constant IGNORE.

  • simple(columnName, columnType, ...) - Specify a single column to be deserialized from the field in the Kafka event.
  • json(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • IGNORE - Ignore the field in the Kafka event.
value optionalMethod

This argument should be provided from the result of either calling one of the methods avro, json, or simple, or the constant IGNORE.

  • simple(columnName, columnType, ...) - Specify a single column to be deserialized from the field in the Kafka event.
  • json(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • IGNORE - Ignore the field in the Kafka event.
table_type optionalTableType
  • TableType.append() - Create append-only tables.
  • TableType.stream() - Create streaming tables (default).
  • TableType.ring(N) - Create a ring table.

Returns

A new in-memory table from a Kafka stream.

Examples

In the following example, consume is used to read the Kafka topic testTopic into a Deephaven table. Only the first two positional arguments are required to read a Kafka stream into a Deephaven table.

  • The first positional argument, {'bootstrap.servers': 'redpanda:29092'}, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.
    • Here, the host and port for the Kafka server to use to bootstrap the subscription are specified. This is the minimal amount of required information.
    • The value redpanda:29092 corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
  • The second positional argument is the name of the topic (testTopic).
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec

result = ck.consume({'bootstrap.servers': 'redpanda:29092'}, 'testTopic')

img

In the following example, consume is used to read Kafka topic share_price with additional settings enabled and a specific key and value defined.

  • partitions is set to ALL_PARTITIONS, which listens to all partitions.
  • offsets is set to ALL_PARTITIONS_DONT_SEEK, which only listens to new messages produced after this call is processed.
  • key is set to simple('Symbol', dht.string), which expects messages with a Kafka key field of type string, and creates a Symbol column to store the information.
  • value is set to simple('Price', dht.double), which expects messages with a Kafka value field of type double, and creates a Price column to store the information.
  • table_type is set to TableType.append(), which creates an append-only table.
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht


result = ck.consume({'bootstrap.servers' : 'redpanda:29092'},
'share_price',
partitions=ck.ALL_PARTITIONS,
offsets=ck.ALL_PARTITIONS_DONT_SEEK,
key_spec=ck.simple_spec('Symbol', dht.string),
value_spec=ck.simple_spec('Price', dht.double),
table_type=TableType.append())

img

In the following example, consume reads the Kafka topic share_price with an additional dictionary set and keys ignored.

  • deephaven.partition.column.name is set to None. The result table will not include a column for the partition field.
  • key is set to IGNORE. The result table will not include a column associated with the Kafka field key.
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht

result = ck.consume({ 'bootstrap.servers' : 'redpanda:29092',
'deephaven.partition.column.name' : None },
'share_price',
key_spec=KeyValueSpec.IGNORE,
value_spec=ck.simple_spec('Price', dht.double))

img

In the following example, consume reads the Kafka topic share_price in JSON format.

from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht

result = ck.consume({ 'bootstrap.servers' : 'redpanda:29092' },
'orders',
key_spec=KeyValueSpec.IGNORE,
value_spec=ck.json_spec([('Symbol', dht.string),
('Side', dht.string),
('Price', dht.double),
('Qty', dht.int_) ],
mapping={ 'jsymbol' : 'Symbol',
'jside' : 'Side',
'jprice' : 'Price',
'jqty' : 'Qty' }),
table_type=TableType.append())

img

In the following example, consume reads the Kafka topic share_price in Avro format.

from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec

result = ck.consume({ 'bootstrap.servers' : 'redpanda:29092',
'schema.registry.url' :
'http://redpanda:8081' },
'share_price',
key_spec=KeyValueSpec.IGNORE,
value_spec=ck.avro_spec('share_price_record', schema_version='1'),
table_type=TableType.Append)

img img