Package io.deephaven.kafka
Class KafkaTools
java.lang.Object
io.deephaven.kafka.KafkaTools
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic class
static interface
A callback which is invoked from the consumer loop, enabling clients to inject logic to be invoked by the Kafka consumer thread.static interface
Determines the initial offset to seek to for a given KafkaConsumer and TopicPartition.static enum
Enum to specify operations that may apply to either of Kafka KEY or VALUE fields.static class
static interface
static class
static interface
static interface
Marker interface forStreamConsumer
registrar provider objects.static interface
Type for the resultTable
returned by kafka consumers. -
Field Summary
Modifier and TypeFieldDescriptionstatic final IntPredicate
static final IntToLongFunction
static final IntToLongFunction
static final IntToLongFunction
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final long
static final String
static final String
static final String
static final String
static final KafkaTools.Consume.KeyOrValueSpec
The names for the key or value columns can be provided in the properties as "deephaven.key.column.name" or "deephaven.value.column.name", and otherwise default to "KafkaKey" or "KafkaValue".static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final long
static final long
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionstatic void
avroSchemaToColumnDefinitions
(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, org.apache.avro.Schema schema, Function<String, String> requestedFieldPathToColumnName) static void
avroSchemaToColumnDefinitions
(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, org.apache.avro.Schema schema, Function<String, String> requestedFieldPathToColumnName, boolean useUTF8Strings) static void
avroSchemaToColumnDefinitions
(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema) Convert an Avro schema to a list of column definitions, mapping every avro field to a column of the same name.static void
avroSchemaToColumnDefinitions
(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema, Function<String, String> requestedFieldPathToColumnName) Convert an Avro schema to a list of column definitions.static org.apache.avro.Schema
columnDefinitionsToAvroSchema
(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, org.apache.commons.lang3.mutable.MutableObject<Properties> colPropsOut) static void
consume
(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull KafkaTools.InitialOffsetLookup partitionToInitialOffset, KafkaTools.Consume.KeyOrValueSpec keySpec, KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, @Nullable KafkaTools.ConsumerLoopCallback consumerLoopCallback) Consume from Kafka tostream consumers
supplied bystreamConsumerRegistrar
.static PartitionedTable
consumeToPartitionedTable
(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, KafkaTools.Consume.KeyOrValueSpec keySpec, KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.TableType tableType) Consume from Kafka to a DeephavenPartitionedTable
containing one constituentTable
per partition.static Table
consumeToTable
(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, KafkaTools.Consume.KeyOrValueSpec keySpec, KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.TableType tableType) Consume from Kafka to a DeephavenTable
.static KafkaTools.TableType
friendlyNameToTableType
(@NotNull String typeName) Map "Python-friendly" table type name to aKafkaTools.TableType
.static org.apache.avro.Schema
getAvroSchema
(String avroSchemaAsJsonString) Create an Avro schema object for a String containing a JSON encoded Avro schema definition.static TableDefinition
getTableDefinition
(@NotNull Properties kafkaProperties, KafkaTools.Consume.KeyOrValueSpec keySpec, KafkaTools.Consume.KeyOrValueSpec valueSpec) Construct aTableDefinition
based on the input Properties andKafkaTools.Consume.KeyOrValueSpec
parameters.static String[]
listTopics
(@NotNull Properties kafkaProperties) static IntPredicate
partitionFilterFromArray
(int[] partitions) static IntToLongFunction
partitionToOffsetFromParallelArrays
(int[] partitions, long[] offsets) predicateFromSet
(Set<String> set) static Runnable
produceFromTable
(@NotNull Table table, @NotNull Properties kafkaProperties, @NotNull String topic, KafkaTools.Produce.KeyOrValueSpec keySpec, KafkaTools.Produce.KeyOrValueSpec valueSpec, boolean lastByKeyColumns) Produce a Kafka stream from a Deephaven table.static Runnable
produceFromTable
(KafkaPublishOptions options) Produce a Kafka stream from a Deephaven table.topics
(@NotNull Properties kafkaProperties)
-
Field Details
-
KAFKA_PARTITION_COLUMN_NAME_PROPERTY
- See Also:
-
KAFKA_PARTITION_COLUMN_NAME_DEFAULT
- See Also:
-
OFFSET_COLUMN_NAME_PROPERTY
- See Also:
-
OFFSET_COLUMN_NAME_DEFAULT
- See Also:
-
TIMESTAMP_COLUMN_NAME_PROPERTY
- See Also:
-
TIMESTAMP_COLUMN_NAME_DEFAULT
- See Also:
-
RECEIVE_TIME_COLUMN_NAME_PROPERTY
- See Also:
-
RECEIVE_TIME_COLUMN_NAME_DEFAULT
-
KEY_BYTES_COLUMN_NAME_PROPERTY
- See Also:
-
KEY_BYTES_COLUMN_NAME_DEFAULT
-
VALUE_BYTES_COLUMN_NAME_PROPERTY
- See Also:
-
VALUE_BYTES_COLUMN_NAME_DEFAULT
-
KEY_COLUMN_NAME_PROPERTY
- See Also:
-
KEY_COLUMN_NAME_DEFAULT
- See Also:
-
VALUE_COLUMN_NAME_PROPERTY
- See Also:
-
VALUE_COLUMN_NAME_DEFAULT
- See Also:
-
KEY_COLUMN_TYPE_PROPERTY
- See Also:
-
VALUE_COLUMN_TYPE_PROPERTY
- See Also:
-
SCHEMA_SERVER_PROPERTY
- See Also:
-
SHORT_DESERIALIZER
-
INT_DESERIALIZER
-
LONG_DESERIALIZER
-
FLOAT_DESERIALIZER
-
DOUBLE_DESERIALIZER
-
BYTE_ARRAY_DESERIALIZER
-
STRING_DESERIALIZER
-
BYTE_BUFFER_DESERIALIZER
-
AVRO_DESERIALIZER
-
DESERIALIZER_FOR_IGNORE
-
SHORT_SERIALIZER
-
INT_SERIALIZER
-
LONG_SERIALIZER
-
FLOAT_SERIALIZER
-
DOUBLE_SERIALIZER
-
BYTE_ARRAY_SERIALIZER
-
STRING_SERIALIZER
-
BYTE_BUFFER_SERIALIZER
-
AVRO_SERIALIZER
-
SERIALIZER_FOR_IGNORE
-
NESTED_FIELD_NAME_SEPARATOR
- See Also:
-
NESTED_FIELD_COLUMN_NAME_SEPARATOR
- See Also:
-
AVRO_LATEST_VERSION
- See Also:
-
SEEK_TO_BEGINNING
public static final long SEEK_TO_BEGINNING- See Also:
-
DONT_SEEK
public static final long DONT_SEEK- See Also:
-
SEEK_TO_END
public static final long SEEK_TO_END- See Also:
-
ALL_PARTITIONS
-
ALL_PARTITIONS_SEEK_TO_BEGINNING
-
ALL_PARTITIONS_DONT_SEEK
-
ALL_PARTITIONS_SEEK_TO_END
-
DIRECT_MAPPING
-
FROM_PROPERTIES
The names for the key or value columns can be provided in the properties as "deephaven.key.column.name" or "deephaven.value.column.name", and otherwise default to "KafkaKey" or "KafkaValue". The types for key or value are either specified in the properties as "deephaven.key.column.type" or "deephaven.value.column.type" or deduced from the serializer classes for "key.deserializer" or "value.deserializer" in the provided Properties object.
-
-
Constructor Details
-
KafkaTools
public KafkaTools()
-
-
Method Details
-
getAvroSchema
Create an Avro schema object for a String containing a JSON encoded Avro schema definition.- Parameters:
avroSchemaAsJsonString
- The JSON Avro schema definition- Returns:
- an Avro schema object
-
columnDefinitionsToAvroSchema
public static org.apache.avro.Schema columnDefinitionsToAvroSchema(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, org.apache.commons.lang3.mutable.MutableObject<Properties> colPropsOut) -
avroSchemaToColumnDefinitions
-
avroSchemaToColumnDefinitions
-
avroSchemaToColumnDefinitions
public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema, Function<String, String> requestedFieldPathToColumnName) Convert an Avro schema to a list of column definitions.- Parameters:
columnsOut
- Column definitions for output; should be empty on entry.schema
- Avro schemarequestedFieldPathToColumnName
- An optional mapping to specify selection and naming of columns from Avro fields, or null for map all fields using field path for column name.
-
avroSchemaToColumnDefinitions
public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema) Convert an Avro schema to a list of column definitions, mapping every avro field to a column of the same name.- Parameters:
columnsOut
- Column definitions for output; should be empty on entry.schema
- Avro schema
-
friendlyNameToTableType
@ScriptApi public static KafkaTools.TableType friendlyNameToTableType(@NotNull @NotNull String typeName) Map "Python-friendly" table type name to aKafkaTools.TableType
. Supported values are:"blink"
"stream"
(deprecated; use"blink"
)"append"
"ring:<capacity>"
where capacity is a integer number specifying the maximum number of trailing rows to include in the result
- Parameters:
typeName
- The friendly name- Returns:
- The mapped
KafkaTools.TableType
-
consumeToTable
public static Table consumeToTable(@NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull @NotNull IntPredicate partitionFilter, @NotNull @NotNull IntToLongFunction partitionToInitialOffset, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull @NotNull KafkaTools.TableType tableType) Consume from Kafka to a DeephavenTable
.- Parameters:
kafkaProperties
- Properties to configure the result and also to be passed to create the KafkaConsumertopic
- Kafka topic namepartitionFilter
- A predicate returning true for the partitions to consume. The convenience constantALL_PARTITIONS
is defined to facilitate requesting all partitions.partitionToInitialOffset
- A function specifying the desired initial offset for each partition consumedkeySpec
- Conversion specification for Kafka record keysvalueSpec
- Conversion specification for Kafka record valuestableType
-KafkaTools.TableType
specifying the type of the expected result- Returns:
- The result
Table
containing Kafka stream data formatted according totableType
-
consumeToPartitionedTable
public static PartitionedTable consumeToPartitionedTable(@NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull @NotNull IntPredicate partitionFilter, @NotNull @NotNull IntToLongFunction partitionToInitialOffset, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull @NotNull KafkaTools.TableType tableType) Consume from Kafka to a DeephavenPartitionedTable
containing one constituentTable
per partition.- Parameters:
kafkaProperties
- Properties to configure the result and also to be passed to create the KafkaConsumertopic
- Kafka topic namepartitionFilter
- A predicate returning true for the partitions to consume. The convenience constantALL_PARTITIONS
is defined to facilitate requesting all partitions.partitionToInitialOffset
- A function specifying the desired initial offset for each partition consumedkeySpec
- Conversion specification for Kafka record keysvalueSpec
- Conversion specification for Kafka record valuestableType
-KafkaTools.TableType
specifying the type of the expected result's constituent tables- Returns:
- The result
PartitionedTable
containing Kafka stream data formatted according totableType
-
getTableDefinition
public static TableDefinition getTableDefinition(@NotNull @NotNull Properties kafkaProperties, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec) Construct aTableDefinition
based on the input Properties andKafkaTools.Consume.KeyOrValueSpec
parameters. Given the same input Properties and Consume.KeyOrValueSpec parameters, the returned TableDefinition is the same as the TableDefinition of the table produced byconsumeToTable(Properties, String, IntPredicate, IntToLongFunction, Consume.KeyOrValueSpec, Consume.KeyOrValueSpec, TableType)
- Parameters:
kafkaProperties
- Properties to configure this tablekeySpec
- Conversion specification for Kafka record keysvalueSpec
- Conversion specification for Kafka record values- Returns:
- A TableDefinition derived from the input Properties and KeyOrValueSpec instances
-
consume
public static void consume(@NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull @NotNull IntPredicate partitionFilter, @NotNull @NotNull KafkaTools.InitialOffsetLookup partitionToInitialOffset, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull @NotNull KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, @Nullable @Nullable KafkaTools.ConsumerLoopCallback consumerLoopCallback) Consume from Kafka tostream consumers
supplied bystreamConsumerRegistrar
.- Parameters:
kafkaProperties
- Properties to configure this table and also to be passed to create the KafkaConsumertopic
- Kafka topic namepartitionFilter
- A predicate returning true for the partitions to consume. The convenience constantALL_PARTITIONS
is defined to facilitate requesting all partitions.partitionToInitialOffset
- A function specifying the desired initial offset for each partition consumedkeySpec
- Conversion specification for Kafka record keysvalueSpec
- Conversion specification for Kafka record valuesstreamConsumerRegistrarProvider
- A provider for a function toregister
StreamConsumer
instances. The registered stream consumers must acceptchunk types
that correspond toStreamChunkUtils.chunkTypeForColumnIndex(TableDefinition, int)
for the suppliedTableDefinition
. Seesingle
andper-partition
.consumerLoopCallback
- callback to inject logic into the ingester's consumer loop
-
produceFromTable
public static Runnable produceFromTable(@NotNull @NotNull Table table, @NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull KafkaTools.Produce.KeyOrValueSpec keySpec, @NotNull KafkaTools.Produce.KeyOrValueSpec valueSpec, boolean lastByKeyColumns) Produce a Kafka stream from a Deephaven table.Note that
table
must only change in ways that are meaningful when turned into a stream of events over Kafka.Two primary use cases are considered:
- A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently
and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result
of a Deephaven aggregation, e.g.
TableOperations.aggAllBy(io.deephaven.api.agg.spec.AggSpec)
,TableOperations.aggBy(io.deephaven.api.agg.Aggregation)
, orTableOperations.lastBy()
. This means that key columns (as specified bykeySpec
) must not be modified, and no rows should be shifted if there are any key columns. Note that specifyinglastByKeyColumns=true
can make it easy to satisfy this constraint if the input data is not already aggregated. - A stream of independent log records. In this case, the input table should either be a
blink table
or should only ever add rows (regardless of whether theattribute
is specified).
If other use cases are identified, a publication mode or extensible listener framework may be introduced at a later date.
- Parameters:
table
- The table used as a source of data to be sent to Kafka.kafkaProperties
- Properties to be passed to create the associated KafkaProducer.topic
- Kafka topic namekeySpec
- Conversion specification for Kafka record keys from table column data.valueSpec
- Conversion specification for Kafka record values from table column data.lastByKeyColumns
- Whether to publish only the last record for each unique key. Ignored whenkeySpec
isIGNORE
. Otherwise, iflastByKeycolumns == true
this method will internally perform alastBy
aggregation ontable
grouped by the input columns ofkeySpec
and publish to Kafka from the result.- Returns:
- a callback to stop producing and shut down the associated table listener; note a caller should keep a reference to this return value to ensure liveliness.
- See Also:
- A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently
and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result
of a Deephaven aggregation, e.g.
-
produceFromTable
Produce a Kafka stream from a Deephaven table.Note that
table
must only change in ways that are meaningful when turned into a stream of events over Kafka.Two primary use cases are considered:
- A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently
and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result
of a Deephaven aggregation, e.g.
TableOperations.aggAllBy(io.deephaven.api.agg.spec.AggSpec)
,TableOperations.aggBy(io.deephaven.api.agg.Aggregation)
, orTableOperations.lastBy()
. This means that key columns (as specified bykeySpec
) must not be modified, and no rows should be shifted if there are any key columns. Note that specifyinglastByKeyColumns=true
can make it easy to satisfy this constraint if the input data is not already aggregated. - A stream of independent log records. In this case, the input table should either be a
blink table
or should only ever add rows (regardless of whether theattribute
is specified).
If other use cases are identified, a publication mode or extensible listener framework may be introduced at a later date.
- Parameters:
options
- the options- Returns:
- a callback to stop producing and shut down the associated table listener; note a caller should keep a reference to this return value to ensure liveliness.
- A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently
and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result
of a Deephaven aggregation, e.g.
-
partitionFilterFromArray
-
partitionToOffsetFromParallelArrays
public static IntToLongFunction partitionToOffsetFromParallelArrays(int[] partitions, long[] offsets) -
predicateFromSet
-
topics
-
listTopics
-