produce
produce
writes a Kafka stream from an in-memory table.
Syntax
produce(
table: Table,
kafka_config: dict,
topic: str,
key_spec: KeyValueSpec,
value_spec: KeyValueSpec,
last_by_key_columns: bool = False,
)
Parameters
Parameter | Type | Description |
---|---|---|
table | Table | Source table with data to write to the Kafka stream. |
kafka_config | dict | Configuration for the associated Kafka producer. This is used to call the constructor of |
topic | String | Name of the topic to write to. |
key_spec | KeyValueSpec | Specifies how to map table column(s) to the Key field in produced Kafka messages. It can be any of the following, found in
|
value_spec | KeyValueSpec | Specifies how to map table column(s) to the Value field in produced Kafka messages. It can be any of the following, found in
|
last_by_key_columns optional | bool |
|
Returns
A Kafka stream from an in-memory table.
Examples
In the following example, produce
is used to write to the Kafka topic testTopic
from a Deephaven table.
- The first positional argument,
{'bootstrap.servers': 'redpanda:9092'}
, is a Python dictionary that is standard for any Python library consuming messages from Kafka. This dictionary gives the underlying Kafka library information about the Kafka infrastructure.- Here, we specify the host and port for the Kafka server to use to bootstrap the subscription. This is the minimal amount of required information.
- The value
redpanda:9092
corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
- The second positional argument is the name of the topic (
testTopic
). - The third positional argument writes column
X
to the stream. - The fourth positional argument says to ignore values.
from deephaven import time_table
from deephaven import kafka_producer as pk
from deephaven.stream.kafka.producer import KeyValueSpec
source = time_table("PT00:00:00.1").update(formulas=["X = i"])
write_topic = pk.produce(
source,
{"bootstrap.servers": "redpanda:9092"},
"testTopic",
pk.simple_spec("X"),
KeyValueSpec.IGNORE,
)
- source
In the following example, produce
is used to write Kafka topic topic_group
with additional settings last_by_key_columns
as True
. This indicates that we want to perform a last_by
on the keys before writing to the stream.
import random
source_group = time_table("PT00:00:00.1").update(
formulas=["X = random.randint(1, 5)", "Y = i"]
)
write_topic_group = pk.produce(
source_group,
{"bootstrap.servers": "redpanda:9092"},
"time-topic_group",
pk.json_spec(["X"]),
pk.json_spec(
[
"X",
"Y",
]
),
True,
)
- source_group