Skip to main content
Version: Python


produce writes a Kafka stream from an in-memory table.


table: Table,
kafka_config: dict,
topic: str,
key_spec: KeyValueSpec,
value_spec: KeyValueSpec,
last_by_key_columns: bool = False,



Source table with data to write to the Kafka stream.


Configuration for the associated Kafka producer. This is used to call the constructor of org.apache.kafka.clients.producer.KafkaProducer. Pass any KafkaProducer-specific desired configuration here.


Name of the topic to write to.


Specifies how to map table column(s) to the Key field in produced Kafka messages. It can be any of the following, found in

  • simple_spec - Specify a single column to be deserialized from the field in the Kafka event.
  • json_spec - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro_spec - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • KeyValueSpec.IGNORE - Ignore the field in the Kafka event.

Specifies how to map table column(s) to the Value field in produced Kafka messages. It can be any of the following, found in

  • simple_spec - Specify a single column to be deserialized from the field in the Kafka event.
  • json_spec - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro_spec - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • KeyValueSpec.IGNORE - Ignore the field in the Kafka event.
last_by_key_columns optionalbool
  • True - Perform a last_by_key_columns on keys before writing to stream.
  • False - (default) Write all data to stream.


A Kafka stream from an in-memory table.


In the following example, produce is used to write to the Kafka topic testTopic from a Deephaven table.

  • The first positional argument, {'bootstrap.servers': 'redpanda:9092'}, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.
    • Here, we specify the host and port for the Kafka server to use to bootstrap the subscription. This is the minimal amount of required information.
    • The value redpanda:9092 corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
  • The second positional argument is the name of the topic (testTopic).
  • The third positional argument writes column X to the stream.
  • The fourth positional argument says to ignore values.
from deephaven import time_table
from deephaven import kafka_producer as pk
from import KeyValueSpec

source = time_table("PT00:00:00.1").update(formulas=["X = i"])

write_topic = pk.produce(
{"bootstrap.servers": "redpanda:9092"},

In the following example, produce is used to write Kafka topic topic_group with additional settings last_by_key_columns as True. This indicates that we want to perform a last_by on the keys before writing to the stream.

import random

source_group = time_table("PT00:00:00.1").update(
formulas=["X = random.randint(1, 5)", "Y = i"]

write_topic_group = pk.produce(
{"bootstrap.servers": "redpanda:9092"},