produce
produce
writes a Kafka stream from an in-memory table.
Syntax
produceFromTable(kafkaProduceProperties, topic)
produceFromTable(table, kafkaProduceProperties, topic,
partitions, offsets,
key_spec=simple(columnName, columnType, ...),
value_spec=simple(columnName, columnType, ...),
last_by_key_columns)
Parameters
Parameter | Type | Description |
---|---|---|
tables | Table... | Source tables with data to write to the Kafka stream. |
kafkaProduceProperties | String | Provides the initial hosts that act as the starting point for a Kafka client to discover the full set of alive servers in the cluster; e.g., |
topic | String | Name of the topic to write to. |
key optional | Internal Type | This argument should be provided from the result of either calling one of the format methods (
|
value optional | Method | This argument should be provided from the result of either calling one of the format methods (
|
last_by_key_columns optional | Boolean |
|
Returns
A Kafka stream from an in-memory table.
Examples
In the following example, produce
is used to write to the Kafka topic testTopic
from a Deephaven table.
- The first positional argument,
{'bootstrap.servers': 'redpanda:29092'}
, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.- Here, we specify the host and port for the Kafka server to use to bootstrap the subscription. This is the minimal amount of required information.
- The value
redpanda:29092
corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
- The second positional argument is the name of the topic (
testTopic
). - The third positional argument writes column
X
to the stream. - The fourth positional argument says to ignore values.
from deephaven import time_table
from deephaven import kafka_producer as pk
from deephaven.stream.kafka.producer import KeyValueSpec
source = time_table('00:00:00.1').update(formulas=["X = i"])
write_topic = pk.produce(source, {'bootstrap.servers': 'redpanda:29092'},\
'testTopic', pk.simple_spec('X'), KeyValueSpec.IGNORE)
- source
In the following example, produce
is used to write Kafka topic topic_group
with additional settings last_by_key_columns
as True
. This indicates that we want to perform a last_by
on the keys before writing to the stream.
import random
source_group = time_table('00:00:00.1').update(formulas=["X = random.randint(1, 5)", "Y = i"])
write_topic_group = pk.produce(source_group, {'bootstrap.servers': 'redpanda:29092'},\
'time-topic_group',pk.json_spec(['X']), pk.json_spec(['X','Y',]), True)
- source_group