consumeToPartitionedTable
The consumeToPartitionedTable
method reads a Kafka stream into an in-memory partitioned table.
Syntax
consumeToPartitionedTable(
kafkaProperties,
topic,
partitionFilter,
partitionToInitialOffset,
keySpec,
valueSpec,
tableType
)
Parameters
Parameter | Type | Description |
---|---|---|
kafkaProperties | Properties | Configuration for the associated Kafka consumer and the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of |
topic | str | The Kafka topic name. |
partitionFilter | IntPredicate | A predicate returning true for the partitions to consume. The convenience constant ALL_PARTITIONS is defined to facilitate requesting all partitions. |
partitionToInitialOffset | IntToLongFunction | A function specifying the desired initial offset for each partition consumed. |
keySpec | KeyOrValueSpec | Conversion specification for Kafka record keys. |
valueSpec | KeyOrValueSpec | Conversion specification for Kafka record values. |
tableType | TableType | A
|
Returns
An in-memory partitioned table.
Examples
In the following example, consumeToPartitionedTable
is used to read the Kafka topic orders
into a Deephaven table. KafkaTools.Consume.FROM_PROPERTIES
allows the key and value column types to be inferred by the properties passed in.
- The host and port for the Kafka server to use to bootstrap are specified by
kafkaProps
.- The value
redpanda:9092
corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
- The value
- The topic name is
orders
.
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
symbolDef = ColumnDefinition.ofString('Symbol')
sideDef = ColumnDefinition.ofString('Side')
priceDef = ColumnDefinition.ofDouble('Price')
qtyDef = ColumnDefinition.ofInt('Qty')
ColumnDefinition[] colDefs = [symbolDef, sideDef, priceDef, qtyDef]
mapping = ['jsymbol': 'Symbol', 'jside': 'Side', 'jprice': 'Price', 'jqty': 'Qty']
spec = KafkaTools.Consume.jsonSpec(colDefs, mapping, null)
pt = KafkaTools.consumeToPartitionedTable(
kafkaProps,
'orders',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.IGNORE,
spec,
KafkaTools.TableType.append()
)