produceFromTable
produceFromTable
writes a Kafka stream from an in-memory table.
Syntax
produceFromTable(options)
produceFromTable(table, kafkaProperties, topic, keySpec, valueSpec, lastByKeyColumns)
Parameters
Parameter | Type | Description |
---|---|---|
options | KafkaPublishOptions | Kafka publishing options that include the source table, properties, string, key and value specifications, and whether or not to perform a |
table | Table | The table to use as the source of data for Kafka. |
kafkaProperties | Properties | Properties to be passed to create the associated producer. |
topic | String | The Kafka topic name. |
keySpec | KeyOrValueSpec | Conversion specification for Kafka record keys from table column data. |
valueSpec | KeyOrValueSpec | Conversion specification for Kafka record values from table column data. |
lastByKeyColumns | boolean | Whether to publish only the last record for each unique key. Ignored when |
Returns
A callback to stop producing and shut down the associated table listener.
Examples
In the following example, produceFromTable
is used to write to the Kafka topic testTopic
from a Deephaven table.
import io.deephaven.kafka.KafkaPublishOptions
import io.deephaven.kafka.KafkaTools
source = timeTable('PT00:00:00.1').update('X = i')
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
options = KafkaPublishOptions.
builder().
table(source).
topic('time-topic').
config(kafkaProps).
keySpec(KafkaTools.Produce.simpleSpec('X')).
valueSpec(KafkaTools.Produce.IGNORE).
build()
runnable = KafkaTools.produceFromTable(options)
In the following example, produceFromTable
is used to write the Kafka topic with lastByKeyColumns
as true
. This indicates we want to perform a lastBy
with the specified key columns before publishing the data.
import io.deephaven.kafka.KafkaPublishOptions
import io.deephaven.kafka.KafkaTools
sourceGroup = timeTable('PT00:00:00.1')
.update('X = randomInt(1, 5)', 'Y = i')
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
options = KafkaPublishOptions.
builder().
table(sourceGroup).
topic('time-topic_group').
config(kafkaProps).
keySpec(KafkaTools.Produce.jsonSpec(['X'] as String[], null, null)).
valueSpec(KafkaTools.Produce.jsonSpec(['X', 'Y'] as String[], null, null)).
lastBy(true).
build()
runnable = KafkaTools.produceFromTable(options)