Skip to main content
Version: Python

produce

produce writes a Kafka stream from an in-memory table.

Syntax

produceFromTable(kafkaProduceProperties, topic)

produceFromTable(table, kafkaProduceProperties, topic,
partitions, offsets,
key_spec=simple(columnName, columnType, ...),
value_spec=simple(columnName, columnType, ...),
last_by_key_columns)

Parameters

ParameterTypeDescription
tablesTable...

Source tables with data to write to the Kafka stream.

kafkaProducePropertiesString

Provides the initial hosts that act as the starting point for a Kafka client to discover the full set of alive servers in the cluster; e.g., bootstrap.servers.

topicString

Name of the topic to write to.

key optionalInternal Type

This argument should be provided from the result of either calling one of the format methods (avro, json, or simple), or the constant IGNORE.

  • simple(columnName, columnType, ...) - Specify a single column to be deserialized from the field in the Kafka event.
  • json(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • IGNORE - Ignore the field in the Kafka event.
value optionalMethod

This argument should be provided from the result of either calling one of the format methods (avro, json, or simple), or the constant IGNORE.

  • simple(columnName, columnType, ...) - Specify a single column to be deserialized from the field in the Kafka event.
  • json(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as JSON.
  • avro(...) - Specify multiple columns to be deserialized from the field in the Kafka event by parsing it as Avro.
  • IGNORE - Ignore the field in the Kafka event.
last_by_key_columns optionalBoolean
  • True - Perform a last_by_key_columns on keys before writing to stream.
  • False - (default) Write all data to stream.

Returns

A Kafka stream from an in-memory table.

Examples

In the following example, produce is used to write to the Kafka topic testTopic from a Deephaven table.

  • The first positional argument, {'bootstrap.servers': 'redpanda:29092'}, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.
    • Here, we specify the host and port for the Kafka server to use to bootstrap the subscription. This is the minimal amount of required information.
    • The value redpanda:29092 corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
  • The second positional argument is the name of the topic (testTopic).
  • The third positional argument writes column X to the stream.
  • The fourth positional argument says to ignore values.
from deephaven import time_table
from deephaven import kafka_producer as pk
from deephaven.stream.kafka.producer import KeyValueSpec

source = time_table('00:00:00.1').update(formulas=["X = i"])

write_topic = pk.produce(source, {'bootstrap.servers': 'redpanda:29092'},\
'testTopic', pk.simple_spec('X'), KeyValueSpec.IGNORE)

In the following example, produce is used to write Kafka topic topic_group with additional settings last_by_key_columns as True. This indicates that we want to perform a last_by on the keys before writing to the stream.

import random
source_group = time_table('00:00:00.1').update(formulas=["X = random.randint(1, 5)", "Y = i"])

write_topic_group = pk.produce(source_group, {'bootstrap.servers': 'redpanda:29092'},\
'time-topic_group',pk.json_spec(['X']), pk.json_spec(['X','Y',]), True)