consumeToTable
consumeToTable
reads a Kafka stream into an in-memory table.
Syntax
consumeToTable(
kafkaProperties,
topic,
partitionFilter,
partitionToInitialOffset,
keySpec,
valueSpec,
tableType
)
Parameters
Parameter | Type | Description |
---|---|---|
kafkaProperties | Properties | Properties to configure the result and also to be passed to create the |
topic | String | The Kafka topic name. |
partitionFilter | IntPredicate | A predicate returning true for the partitions to consume. The convenience constant |
partitionToInitialOffset | IntToLongFunction | A function specifying the desired initial offset for each partition consumed. |
keySpec | KafkaTools.Consume.KeyOrValueSpec | Conversion specification for Kafka record keys. |
valueSpec | KafkaTools.Consume.KeyOrValueSpec | Conversion specification for Kafka record values. |
tableType | KafkaTools.TableType | The expected table type of the resultant table. |
Returns
An in-memory table.
Examples
In the following example, consumeToTable
is used to read the Kafka topic test.topic
into a Deephaven table. KafkaTools.Consume.FROM_PROPERTIES
allows the key and value column types to be inferred by the properties passed in.
- The host and port for the Kafka server to use to bootstrap are specified by
kafkaProps
.- The value
redpanda:9092
corresponds to the current setup for development testing with Docker images (which uses an instance of redpanda).
- The value
- The topic name is
testTopic
.
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
kafkaProps.put('deephaven.key.column.type', 'String')
kafkaProps.put('deephaven.value.column.type', 'String')
result = KafkaTools.consumeToTable(
kafkaProps,
'testTopic',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.FROM_PROPERTIES,
KafkaTools.Consume.FROM_PROPERTIES,
KafkaTools.TableType.append()
)
In the following example, consumeToTable
is used to read Kafka topic share_price
with key and value specifications.
- The key column name is
Symbol
, which will beString
type. - The value column name is
Price
, which will bedouble
type.
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers': 'redpanda:9092')
result = KafkaTools.consumeToTable(
kafkaProps,
'share_price',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.simpleSpec('Symbol', java.lang.String),
KafkaTools.Consume.simpleSpec('Price', double),
KafkaTools.TableType.append()
)
The following example sets the deephaven.partition.column.name
as null
, which ignores it.
- This results in the table not having a column for the partition field.
- The key specification is also set to
IGNORE
, soresult
does not contain the Kafka key column.
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
kafkaProps.put('deephaven.partition.column.name', null)
result = KafkaTools.consumeToTable(
kafkaProps,
'share_price',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.IGNORE,
KafkaTools.Consume.simpleSpec('Price', double),
KafkaTools.TableType.blink()
)
In the following example, consumeToTable
reads the Kafka topic share_price
in JSON
format.
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
symbolDef = ColumnDefinition.ofString('Symbol')
sideDef = ColumnDefinition.ofString('Side')
priceDef = ColumnDefinition.ofDouble('Price')
qtyDef = ColumnDefinition.ofInt('Qty')
ColumnDefinition[] colDefs = [symbolDef, sideDef, priceDef, qtyDef]
mapping = ['jsymbol': 'Symbol', 'jside': 'Side', 'jprice': 'Price', 'jqty': 'Qty']
spec = KafkaTools.Consume.jsonSpec(colDefs, mapping, null)
result = KafkaTools.consumeToTable(
kafkaProps,
'orders',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.IGNORE,
spec,
KafkaTools.TableType.blink()
)
In the following example, consumeToTable
reads the Kafka topic share_price
in Avro
format. The schema name and version are specified.
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
kafkaProps.put('schema.registry.url', 'http://redpanda:8081')
result = KafkaTools.consumeToTable(
kafkaProps,
'orders',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.IGNORE,
KafkaTools.Consume.avroSpec('share_price_record', '1'),
KafkaTools.TableType.blink()
)