consumeToTable
consumeToTable reads a Kafka stream into an in-memory table.
Syntax
consumeToTable(
kafkaProperties,
topic,
partitionFilter,
partitionToInitialOffset,
keySpec,
valueSpec,
tableType
)
Parameters
| Parameter | Type | Description |
|---|---|---|
| kafkaProperties | Properties | Properties to configure the result and also to be passed to create the |
| topic | String | The Kafka topic name. |
| partitionFilter | IntPredicate | A predicate returning true for the partitions to consume. The convenience constant |
| partitionToInitialOffset | IntToLongFunction | A function specifying the desired initial offset for each partition consumed. |
| keySpec | KafkaTools.Consume.KeyOrValueSpec | Conversion specification for Kafka record keys. |
| valueSpec | KafkaTools.Consume.KeyOrValueSpec | Conversion specification for Kafka record values. |
| tableType | KafkaTools.TableType | The expected table type of the resultant table. |
Returns
An in-memory table.
Examples
In the following example, consumeToTable is used to read the Kafka topic test.topic into a Deephaven table. KafkaTools.Consume.FROM_PROPERTIES allows the key and value column types to be inferred by the properties passed in.
- The host and port for the Kafka server to use to bootstrap are specified by
kafkaProps.- The value
redpanda:9092corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
- The value
- The topic name is
testTopic.
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
kafkaProps.put('deephaven.key.column.type', 'String')
kafkaProps.put('deephaven.value.column.type', 'String')
result = KafkaTools.consumeToTable(
kafkaProps,
'testTopic',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.FROM_PROPERTIES,
KafkaTools.Consume.FROM_PROPERTIES,
KafkaTools.TableType.append()
)

In the following example, consumeToTable is used to read Kafka topic share_price with key and value specifications.
- The key column name is
Symbol, which will beStringtype. - The value column name is
Price, which will bedoubletype.
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
result = KafkaTools.consumeToTable(
kafkaProps,
'share_price',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.simpleSpec('Symbol', java.lang.String),
KafkaTools.Consume.simpleSpec('Price', double),
KafkaTools.TableType.append()
)

The following example sets the deephaven.partition.column.name as null, which ignores it.
- This results in the table not having a column for the partition field.
- The key specification is also set to
IGNORE, soresultdoes not contain the Kafka key column.
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
kafkaProps.put('deephaven.partition.column.name', null)
result = KafkaTools.consumeToTable(
kafkaProps,
'share_price',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.IGNORE,
KafkaTools.Consume.simpleSpec('Price', double),
KafkaTools.TableType.blink()
)

In the following example, consumeToTable reads the Kafka topic share_price in JSON format.
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
symbolDef = ColumnDefinition.ofString('Symbol')
sideDef = ColumnDefinition.ofString('Side')
priceDef = ColumnDefinition.ofDouble('Price')
qtyDef = ColumnDefinition.ofInt('Qty')
ColumnDefinition[] colDefs = [symbolDef, sideDef, priceDef, qtyDef]
mapping = ['jsymbol': 'Symbol', 'jside': 'Side', 'jprice': 'Price', 'jqty': 'Qty']
spec = KafkaTools.Consume.jsonSpec(colDefs, mapping, null)
result = KafkaTools.consumeToTable(
kafkaProps,
'orders',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.IGNORE,
spec,
KafkaTools.TableType.blink()
)

In the following example, consumeToTable reads the Kafka topic share_price in Avro format. The schema name and version are specified.
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
kafkaProps.put('schema.registry.url', 'http://redpanda:8081')
result = KafkaTools.consumeToTable(
kafkaProps,
'orders',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.IGNORE,
KafkaTools.Consume.avroSpec('share_price_record', '1'),
KafkaTools.TableType.blink()
)
