produce

produce writes a Kafka stream from an in-memory table.

Syntax

Parameters

ParameterTypeDescription
tableTable

Source table with data to write to the Kafka stream.

kafka_configdict

Configuration for the associated Kafka producer. This is used to call the constructor of org.apache.kafka.clients.producer.KafkaProducer. Pass any KafkaProducer-specific desired configuration here.

topicString

Name of the topic to write to.

key_specKeyValueSpec

Specifies how to map table column(s) to the Key field in produced Kafka messages. It can be any of the following, found in deephaven.stream.kafka.producer:

  • simple_spec - Specify a single column to be deserialized from the field in the Kafka event.
  • json_spec - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro_spec - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • KeyValueSpec.IGNORE - Ignore the field in the Kafka event.
value_specKeyValueSpec

Specifies how to map table column(s) to the Value field in produced Kafka messages. It can be any of the following, found in deephaven.stream.kafka.producer:

  • simple_spec - Specify a single column to be deserialized from the field in the Kafka event.
  • json_spec - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro_spec - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • KeyValueSpec.IGNORE - Ignore the field in the Kafka event.
last_by_key_columns optionalbool
  • True - Perform a last_by_key_columns on keys before writing to stream.
  • False - (default) Write all data to stream.

Returns

A Kafka stream from an in-memory table.

Examples

In the following example, produce is used to write to the Kafka topic testTopic from a Deephaven table.

  • The first positional argument, {'bootstrap.servers': 'redpanda:9092'}, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.
    • Here, we specify the host and port for the Kafka server to use to bootstrap the subscription. This is the minimal amount of required information.
    • The value redpanda:9092 corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
  • The second positional argument is the name of the topic (testTopic).
  • The third positional argument writes column X to the stream.
  • The fourth positional argument says to ignore values.

In the following example, produce is used to write Kafka topic topic_group with additional settings last_by_key_columns as True. This indicates that we want to perform a last_by on the keys before writing to the stream.