produce
produce
writes a Kafka stream from an in-memory table.
Syntax
produce(
table: Table,
kafka_config: dict,
topic: str,
key_spec: KeyValueSpec,
value_spec: KeyValueSpec,
last_by_key_columns: bool = False,
)
Parameters
Parameter | Type | Description |
---|---|---|
table | Table | Source table with data to write to the Kafka stream. |
kafka_config | dict | Configuration for the associated Kafka producer. This is used to call the constructor of |
topic | String | Name of the topic to write to. |
key_spec optional | KeyValueSpec | This argument should be provided from the result of either calling one of the format methods (
|
value_spec optional | KeyValueSpec | This argument should be provided from the result of either calling one of the format methods (
|
last_by_key_columns optional | bool |
|
Returns
A Kafka stream from an in-memory table.
Examples
In the following example, produce
is used to write to the Kafka topic testTopic
from a Deephaven table.
- The first positional argument,
{'bootstrap.servers': 'redpanda:9092'}
, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.- Here, we specify the host and port for the Kafka server to use to bootstrap the subscription. This is the minimal amount of required information.
- The value
redpanda:9092
corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
- The second positional argument is the name of the topic (
testTopic
). - The third positional argument writes column
X
to the stream. - The fourth positional argument says to ignore values.
from deephaven import time_table
from deephaven import kafka_producer as pk
from deephaven.stream.kafka.producer import KeyValueSpec
source = time_table('PT00:00:00.1').update(formulas=["X = i"])
write_topic = pk.produce(source, {'bootstrap.servers': 'redpanda:9092'},\
'testTopic', pk.simple_spec('X'), KeyValueSpec.IGNORE)
- source
In the following example, produce
is used to write Kafka topic topic_group
with additional settings last_by_key_columns
as True
. This indicates that we want to perform a last_by
on the keys before writing to the stream.
import random
source_group = time_table("PT00:00:00.1").update(formulas=["X = random.randint(1, 5)", "Y = i"])
write_topic_group = pk.produce(source_group, {'bootstrap.servers': 'redpanda:9092'},\
'time-topic_group',pk.json_spec(['X']), pk.json_spec(['X','Y',]), True)
- source_group