Skip to main content
Version: Java (Groovy)

consumeToPartitionedTable

The consumeToPartitionedTable method reads a Kafka stream into an in-memory partitioned table.

Syntax

consumeToPartitionedTable(
kafkaProperties,
topic,
partitionFilter,
partitionToInitialOffset,
keySpec,
valueSpec,
tableType
)

Parameters

ParameterTypeDescription
kafkaPropertiesProperties

Configuration for the associated Kafka consumer and the resulting table. Once the table-specific properties are stripped, the remaining one is used to call the constructor of org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer-specific desired configuration here.

topicstr

The Kafka topic name.

partitionFilterIntPredicate

A predicate returning true for the partitions to consume. The convenience constant ALL_PARTITIONS is defined to facilitate requesting all partitions.

partitionToInitialOffsetIntToLongFunction

A function specifying the desired initial offset for each partition consumed.

keySpecKeyOrValueSpec

Conversion specification for Kafka record keys.

valueSpecKeyOrValueSpec

Conversion specification for Kafka record values.

tableTypeTableType

A TableType enum. The default is TableType.blink().

  • TableType.append() - Create an append-only table.
  • TableType.blink() - Create a blink table (default).
  • TableType.ring(N) - Create a ring table of size N.

Returns

An in-memory partitioned table.

Examples

In the following example, consumeToPartitionedTable is used to read the Kafka topic orders into a Deephaven table. KafkaTools.Consume.FROM_PROPERTIES allows the key and value column types to be inferred by the properties passed in.

  • The host and port for the Kafka server to use to bootstrap are specified by kafkaProps.
    • The value redpanda:9092 corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
  • The topic name is orders.
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.kafka.KafkaTools

kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')

symbolDef = ColumnDefinition.ofString('Symbol')
sideDef = ColumnDefinition.ofString('Side')
priceDef = ColumnDefinition.ofDouble('Price')
qtyDef = ColumnDefinition.ofInt('Qty')

ColumnDefinition[] colDefs = [symbolDef, sideDef, priceDef, qtyDef]
mapping = ['jsymbol': 'Symbol', 'jside': 'Side', 'jprice': 'Price', 'jqty': 'Qty']

spec = KafkaTools.Consume.jsonSpec(colDefs, mapping, null)

pt = KafkaTools.consumeToPartitionedTable(
kafkaProps,
'orders',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.IGNORE,
spec,
KafkaTools.TableType.append()
)