Skip to main content
Version: Java (Groovy)

produceFromTable

produceFromTable writes a Kafka stream from an in-memory table.

Syntax

produceFromTable(options)
produceFromTable(table, kafkaProperties, topic, keySpec, valueSpec, lastByKeyColumns)

Parameters

ParameterTypeDescription
optionsKafkaPublishOptions

Kafka publishing options that include the source table, properties, string, key and value specifications, and whether or not to perform a lastBy on the specified key column(s) before publishing.

tableTable

The table to use as the source of data for Kafka.

kafkaPropertiesProperties

Properties to be passed to create the associated producer.

topicString

The Kafka topic name.

keySpecKeyOrValueSpec

Conversion specification for Kafka record keys from table column data.

valueSpecKeyOrValueSpec

Conversion specification for Kafka record values from table column data.

lastByKeyColumnsboolean

Whether to publish only the last record for each unique key. Ignored when keySpec is IGNORE. Otherwise, if true, this method will internally perform a lastBy aggregation using the key column(s) in keySpec before publishing to Kafka.

Returns

A callback to stop producing and shut down the associated table listener.

Examples

In the following example, produceFromTable is used to write to the Kafka topic testTopic from a Deephaven table.

import io.deephaven.kafka.KafkaPublishOptions
import io.deephaven.kafka.KafkaTools

source = timeTable('PT00:00:00.1').update('X = i')

kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')

options = KafkaPublishOptions.
builder().
table(source).
topic('time-topic').
config(kafkaProps).
keySpec(KafkaTools.Produce.simpleSpec('X')).
valueSpec(KafkaTools.Produce.IGNORE).
build()

runnable = KafkaTools.produceFromTable(options)

In the following example, produceFromTable is used to write the Kafka topic with lastByKeyColumns as true. This indicates we want to perform a lastBy with the specified key columns before publishing the data.

import io.deephaven.kafka.KafkaPublishOptions
import io.deephaven.kafka.KafkaTools

sourceGroup = timeTable('PT00:00:00.1')
.update('X = randomInt(1, 5)', 'Y = i')

kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')

options = KafkaPublishOptions.
builder().
table(sourceGroup).
topic('time-topic_group').
config(kafkaProps).
keySpec(KafkaTools.Produce.jsonSpec(['X'] as String[], null, null)).
valueSpec(KafkaTools.Produce.jsonSpec(['X', 'Y'] as String[], null, null)).
lastBy(true).
build()

runnable = KafkaTools.produceFromTable(options)