Skip to main content
Version: Python

produce

produce writes a Kafka stream from an in-memory table.

Syntax

produce(
table: Table,
kafka_config: dict,
topic: str,
key_spec: KeyValueSpec,
value_spec: KeyValueSpec,
last_by_key_columns: bool = False,
)

Parameters

ParameterTypeDescription
tableTable

Source table with data to write to the Kafka stream.

kafka_configdict

Configuration for the associated Kafka producer. This is used to call the constructor of org.apache.kafka.clients.producer.KafkaProducer. Pass any KafkaProducer-specific desired configuration here.

topicString

Name of the topic to write to.

key_spec optionalKeyValueSpec

This argument should be provided from the result of either calling one of the format methods (avro, json, or simple), or the constant IGNORE.

  • simple(columnName, columnType, ...) - Specify a single column to be deserialized from the field in the Kafka event.
  • json(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • IGNORE - Ignore the field in the Kafka event.
value_spec optionalKeyValueSpec

This argument should be provided from the result of either calling one of the format methods (avro, json, or simple), or the constant IGNORE.

  • simple(columnName, columnType, ...) - Specify a single column to be deserialized from the field in the Kafka event.
  • json(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • IGNORE - Ignore the field in the Kafka event.
last_by_key_columns optionalbool
  • True - Perform a last_by_key_columns on keys before writing to stream.
  • False - (default) Write all data to stream.

Returns

A Kafka stream from an in-memory table.

Examples

In the following example, produce is used to write to the Kafka topic testTopic from a Deephaven table.

  • The first positional argument, {'bootstrap.servers': 'redpanda:9092'}, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.
    • Here, we specify the host and port for the Kafka server to use to bootstrap the subscription. This is the minimal amount of required information.
    • The value redpanda:9092 corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
  • The second positional argument is the name of the topic (testTopic).
  • The third positional argument writes column X to the stream.
  • The fourth positional argument says to ignore values.
from deephaven import time_table
from deephaven import kafka_producer as pk
from deephaven.stream.kafka.producer import KeyValueSpec

source = time_table('PT00:00:00.1').update(formulas=["X = i"])

write_topic = pk.produce(source, {'bootstrap.servers': 'redpanda:9092'},\
'testTopic', pk.simple_spec('X'), KeyValueSpec.IGNORE)

In the following example, produce is used to write Kafka topic topic_group with additional settings last_by_key_columns as True. This indicates that we want to perform a last_by on the keys before writing to the stream.

import random
source_group = time_table("PT00:00:00.1").update(formulas=["X = random.randint(1, 5)", "Y = i"])

write_topic_group = pk.produce(source_group, {'bootstrap.servers': 'redpanda:9092'},\
'time-topic_group',pk.json_spec(['X']), pk.json_spec(['X','Y',]), True)