Skip to main content
Version: Python

consume

consume reads a Kafka stream into an in-memory table.

Syntax

consume(kafkaConsumerProperties, topic)

consume(kafkaConsumerProperties, topic,
partitions, offsets,
key_spec=simple(columnName, columnType, ...),
value_spec=simple(columnName, columnType, ...),
table_type)

Parameters

ParameterTypeDescription
kafkaConsumerPropertiesString

Provides the initial hosts that act as the starting point for a Kafka client to discover the full set of alive servers in the cluster; e.g., bootstrap.servers.

topicString

Name of the topic to subscribe to.

partitions optionalArray
  • Subscribe to all partitions by default.
  • Specify partitions as a comma-separated list; e.g.,partitions=[1, 3, 5 ] to only listen to partitions 1, 3, and 5.
offsets optionalDictionary
  • ALL_PARTITIONS_DONT_SEEK - Only recieve new messages produced after this call is processed (default). This matches the default if offsets is not specified.
  • ALL_PARTITIONS_SEEK_TO_END - Go to the newest available message for every partition.
  • ALL_PARTITIONS_SEEK_TO_BEGINNING - Go to the oldest available message for every partition.
  • { _number_ : _offset_, ... } Map partition numbers to numeric offsets or one of the constants DONT_SEEK , SEEK_TO_END, SEEK_TO_BEGINNING
    • For example, { 1 : SEEK_TO_END, 3 : SEEK_TO_BEGINNING, 5 : 27 } means:
      • Start from the last message sent in partition 1.
      • Go to the oldest available message in partition 3.
      • Start with offset 27 in partition 5.
key_spec optionalInternal Type

This argument should be provided from the result of either calling one of the spec generation methods (avro_spec , json_spec, or simple_spec), or the constant IGNORE.

  • simple_spec(columnName, columnType, ...) - Specify a single column to be deserialized from the field in the Kafka event.
  • json_spec(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro_spec(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • IGNORE - Ignore the field in the Kafka event.
value_spec optionalMethod

This argument should be provided from the result of either calling one of the spec generation methods (avro_spec , json_spec, or simple_spec), or the constant IGNORE.

  • simple_spec(columnName, columnType, ...) - Specify a single column to be deserialized from the field in the Kafka event.
  • json_spec(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro_spec(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • IGNORE - Ignore the field in the Kafka event.
table_type optionalTableType
  • TableType.append() - Create append-only tables.
  • TableType.stream() - Create streaming tables (default).
  • TableType.ring(N) - Create a ring table.

Returns

A new in-memory table from a Kafka stream.

Examples

In the following example, consume is used to read the Kafka topic testTopic into a Deephaven table. Only the first two positional arguments are required to read a Kafka stream into a Deephaven table.

  • The first positional argument, {'bootstrap.servers': 'redpanda:29092'}, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.
    • Here, the host and port for the Kafka server to use to bootstrap the subscription are specified. This is the minimal amount of required information.
    • The value redpanda:29092 corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
  • The second positional argument is the name of the topic (testTopic).
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec

result = kc.consume({'bootstrap.servers': 'redpanda:29092'}, 'testTopic')

img

In the following example, consume is used to read Kafka topic share_price with additional settings enabled and a specific key_spec_spec and value_spec defined.

  • partitions is set to ALL_PARTITIONS, which listens to all partitions.
  • offsets is set to ALL_PARTITIONS_DONT_SEEK, which only listens to new messages produced after this call is processed.
  • key_spec is set to simple_spec('Symbol', dht.string), which expects messages with a Kafka key field of type string, and creates a Symbol column to store the information.
  • value_spec is set to simple_spec('Price', dht.double), which expects messages with a Kafka value field of type double, and creates a Price column to store the information.
  • table_type is set to TableType.append(), which creates an append-only table.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht

result = kc.consume({'bootstrap.servers' : 'redpanda:29092'},
'share_price',
partitions=kc.ALL_PARTITIONS,
offsets=kc.ALL_PARTITIONS_DONT_SEEK,
key_spec=kc.simple_spec('Symbol', dht.string),
value_spec=kc.simple_spec('Price', dht.double),
table_type=TableType.append())

img

In the following example, consume reads the Kafka topic share_price with an additional dictionary set and keys ignored.

  • deephaven.partition.column.name is set to None. The result table will not include a column for the partition field.
  • key_spec is set to IGNORE. The result table will not include a column associated with the Kafka field key.
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht

result = kc.consume({ 'bootstrap.servers' : 'redpanda:29092',
'deephaven.partition.column.name' : None },
'share_price',
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.simple_spec('Price', dht.double))

img

In the following example, consume reads the Kafka topic share_price in JSON format.

from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht

result = kc.consume({ 'bootstrap.servers' : 'redpanda:29092' },
'orders',
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.json_spec([('Symbol', dht.string),
('Side', dht.string),
('Price', dht.double),
('Qty', dht.int_) ],
mapping={ 'jsymbol' : 'Symbol',
'jside' : 'Side',
'jprice' : 'Price',
'jqty' : 'Qty' }),
table_type=TableType.append())

img

In the following example, consume reads the Kafka topic share_price in Avro format.

from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec

result = kc.consume({ 'bootstrap.servers' : 'redpanda:29092',
'schema.registry.url' :
'http://redpanda:8081' },
'share_price',
key_spec=KeyValueSpec.IGNORE,
value_spec=kc.avro_spec('share_price_record', schema_version='1'),
table_type=TableType.Append)

img img