Package io.deephaven.kafka
Class KafkaTools
java.lang.Object
io.deephaven.kafka.KafkaTools
public class KafkaTools extends Object
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
KafkaTools.Consume
static class
KafkaTools.DataFormat
Enum to specify the expected processing (format) for Kafka KEY or VALUE fields.static class
KafkaTools.KeyOrValue
Enum to specify operations that may apply to either of Kafka KEY or VALUE fields.static class
KafkaTools.Produce
static interface
KafkaTools.TableType
Type for the resultTable
returned by kafka consumers. -
Field Summary
-
Constructor Summary
Constructors Constructor Description KafkaTools()
-
Method Summary
Modifier and Type Method Description static void
avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Map<String,String> fieldPathToColumnNameOut, org.apache.avro.Schema schema, Function<String,String> requestedFieldPathToColumnName)
static void
avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema)
Convert an Avro schema to a list of column definitions, mapping every avro field to a column of the same name.static void
avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema, Function<String,String> requestedFieldPathToColumnName)
Convert an Avro schema to a list of column definitions.static org.apache.avro.Schema
columnDefinitionsToAvroSchema(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, org.apache.commons.lang3.mutable.MutableObject<Properties> colPropsOut)
static PartitionedTable
consumeToPartitionedTable(Properties kafkaProperties, String topic, IntPredicate partitionFilter, IntToLongFunction partitionToInitialOffset, io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec keySpec, io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec valueSpec, KafkaTools.TableType tableType)
Consume from Kafka to a DeephavenPartitionedTable
containing one constituentTable
per partition.static <RESULT_TYPE>
RESULT_TYPEconsumeToResult(Properties kafkaProperties, String topic, IntPredicate partitionFilter, IntToLongFunction partitionToInitialOffset, io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec keySpec, io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec valueSpec, KafkaTools.TableType tableType, io.deephaven.kafka.KafkaTools.ResultFactory<RESULT_TYPE> resultFactory)
Consume from Kafka to a resultTable
orPartitionedTable
.static Table
consumeToTable(Properties kafkaProperties, String topic, IntPredicate partitionFilter, IntToLongFunction partitionToInitialOffset, io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec keySpec, io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec valueSpec, KafkaTools.TableType tableType)
Consume from Kafka to a DeephavenTable
.static KafkaTools.TableType
friendlyNameToTableType(String typeName)
Map "Python-friendly" table type name to aKafkaTools.TableType
.static org.apache.avro.Schema
getAvroSchema(String avroSchemaAsJsonString)
Create an Avro schema object for a String containing a JSON encoded Avro schema definition.static IntPredicate
partitionFilterFromArray(int[] partitions)
static IntToLongFunction
partitionToOffsetFromParallelArrays(int[] partitions, long[] offsets)
static Predicate<String>
predicateFromSet(Set<String> set)
static Runnable
produceFromTable(Table table, Properties kafkaProperties, String topic, io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec keySpec, io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec valueSpec, boolean lastByKeyColumns)
Produce a Kafka stream from a Deephaven table.
-
Field Details
-
KAFKA_PARTITION_COLUMN_NAME_PROPERTY
- See Also:
- Constant Field Values
-
KAFKA_PARTITION_COLUMN_NAME_DEFAULT
- See Also:
- Constant Field Values
-
OFFSET_COLUMN_NAME_PROPERTY
- See Also:
- Constant Field Values
-
OFFSET_COLUMN_NAME_DEFAULT
- See Also:
- Constant Field Values
-
TIMESTAMP_COLUMN_NAME_PROPERTY
- See Also:
- Constant Field Values
-
TIMESTAMP_COLUMN_NAME_DEFAULT
- See Also:
- Constant Field Values
-
KEY_COLUMN_NAME_PROPERTY
- See Also:
- Constant Field Values
-
KEY_COLUMN_NAME_DEFAULT
- See Also:
- Constant Field Values
-
VALUE_COLUMN_NAME_PROPERTY
- See Also:
- Constant Field Values
-
VALUE_COLUMN_NAME_DEFAULT
- See Also:
- Constant Field Values
-
KEY_COLUMN_TYPE_PROPERTY
- See Also:
- Constant Field Values
-
VALUE_COLUMN_TYPE_PROPERTY
- See Also:
- Constant Field Values
-
SCHEMA_SERVER_PROPERTY
- See Also:
- Constant Field Values
-
SHORT_DESERIALIZER
-
INT_DESERIALIZER
-
LONG_DESERIALIZER
-
FLOAT_DESERIALIZER
-
DOUBLE_DESERIALIZER
-
BYTE_ARRAY_DESERIALIZER
-
STRING_DESERIALIZER
-
BYTE_BUFFER_DESERIALIZER
-
AVRO_DESERIALIZER
-
DESERIALIZER_FOR_IGNORE
-
SHORT_SERIALIZER
-
INT_SERIALIZER
-
LONG_SERIALIZER
-
FLOAT_SERIALIZER
-
DOUBLE_SERIALIZER
-
BYTE_ARRAY_SERIALIZER
-
STRING_SERIALIZER
-
BYTE_BUFFER_SERIALIZER
-
AVRO_SERIALIZER
-
SERIALIZER_FOR_IGNORE
-
NESTED_FIELD_NAME_SEPARATOR
- See Also:
- Constant Field Values
-
NESTED_FIELD_COLUMN_NAME_SEPARATOR
- See Also:
- Constant Field Values
-
AVRO_LATEST_VERSION
- See Also:
- Constant Field Values
-
SEEK_TO_BEGINNING
public static final long SEEK_TO_BEGINNING -
DONT_SEEK
public static final long DONT_SEEK -
SEEK_TO_END
public static final long SEEK_TO_END -
ALL_PARTITIONS
-
ALL_PARTITIONS_SEEK_TO_BEGINNING
-
ALL_PARTITIONS_DONT_SEEK
-
ALL_PARTITIONS_SEEK_TO_END
-
DIRECT_MAPPING
-
FROM_PROPERTIES
public static final io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec FROM_PROPERTIES
-
-
Constructor Details
-
KafkaTools
public KafkaTools()
-
-
Method Details
-
getAvroSchema
Create an Avro schema object for a String containing a JSON encoded Avro schema definition.- Parameters:
avroSchemaAsJsonString
- The JSON Avro schema definition- Returns:
- an Avro schema object
-
columnDefinitionsToAvroSchema
public static org.apache.avro.Schema columnDefinitionsToAvroSchema(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, org.apache.commons.lang3.mutable.MutableObject<Properties> colPropsOut) -
avroSchemaToColumnDefinitions
-
avroSchemaToColumnDefinitions
public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema, Function<String,String> requestedFieldPathToColumnName)Convert an Avro schema to a list of column definitions.- Parameters:
columnsOut
- Column definitions for output; should be empty on entry.schema
- Avro schemarequestedFieldPathToColumnName
- An optional mapping to specify selection and naming of columns from Avro fields, or null for map all fields using field path for column name.
-
avroSchemaToColumnDefinitions
public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema)Convert an Avro schema to a list of column definitions, mapping every avro field to a column of the same name.- Parameters:
columnsOut
- Column definitions for output; should be empty on entry.schema
- Avro schema
-
friendlyNameToTableType
Map "Python-friendly" table type name to aKafkaTools.TableType
. Supported values are:"stream"
"append"
"ring:<capacity>"
where capacity is a integer number specifying the maximum number of trailing rows to include in the result
- Parameters:
typeName
- The friendly name- Returns:
- The mapped
KafkaTools.TableType
-
consumeToTable
public static Table consumeToTable(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.TableType tableType)Consume from Kafka to a DeephavenTable
.- Parameters:
kafkaProperties
- Properties to configure the result and also to be passed to create the KafkaConsumertopic
- Kafka topic namepartitionFilter
- A predicate returning true for the partitions to consume. The convenience constantALL_PARTITIONS
is defined to facilitate requesting all partitions.partitionToInitialOffset
- A function specifying the desired initial offset for each partition consumedkeySpec
- Conversion specification for Kafka record keysvalueSpec
- Conversion specification for Kafka record valuestableType
-KafkaTools.TableType
specifying the type of the expected result- Returns:
- The result
Table
containing Kafka stream data formatted according totableType
-
consumeToPartitionedTable
public static PartitionedTable consumeToPartitionedTable(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.TableType tableType)Consume from Kafka to a DeephavenPartitionedTable
containing one constituentTable
per partition.- Parameters:
kafkaProperties
- Properties to configure the result and also to be passed to create the KafkaConsumertopic
- Kafka topic namepartitionFilter
- A predicate returning true for the partitions to consume. The convenience constantALL_PARTITIONS
is defined to facilitate requesting all partitions.partitionToInitialOffset
- A function specifying the desired initial offset for each partition consumedkeySpec
- Conversion specification for Kafka record keysvalueSpec
- Conversion specification for Kafka record valuestableType
-KafkaTools.TableType
specifying the type of the expected result's constituent tables- Returns:
- The result
PartitionedTable
containing Kafka stream data formatted according totableType
-
consumeToResult
public static <RESULT_TYPE> RESULT_TYPE consumeToResult(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.TableType tableType, @NotNull io.deephaven.kafka.KafkaTools.ResultFactory<RESULT_TYPE> resultFactory)Consume from Kafka to a resultTable
orPartitionedTable
.- Parameters:
kafkaProperties
- Properties to configure this table and also to be passed to create the KafkaConsumertopic
- Kafka topic namepartitionFilter
- A predicate returning true for the partitions to consume. The convenience constantALL_PARTITIONS
is defined to facilitate requesting all partitions.partitionToInitialOffset
- A function specifying the desired initial offset for each partition consumedkeySpec
- Conversion specification for Kafka record keysvalueSpec
- Conversion specification for Kafka record valuestableType
-KafkaTools.TableType
specifying the type of tables used in the result- Returns:
- The result table containing Kafka stream data formatted according to
tableType
-
produceFromTable
public static Runnable produceFromTable(@NotNull Table table, @NotNull Properties kafkaProperties, @NotNull String topic, @NotNull io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec keySpec, @NotNull io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec valueSpec, boolean lastByKeyColumns)Produce a Kafka stream from a Deephaven table.Note that
table
must only change in ways that are meaningful when turned into a stream of events over Kafka.Two primary use cases are considered:
- A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently
and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result
of a Deephaven aggregation, e.g.
TableOperations.aggAllBy(io.deephaven.api.agg.spec.AggSpec)
,TableOperations.aggBy(io.deephaven.api.agg.Aggregation)
, orTableOperations.lastBy()
. This means that key columns (as specified bykeySpec
) must not be modified, and no rows should be shifted if there are any key columns. Note that specifyinglastByKeyColumns=true
can make it easy to satisfy this constraint if the input data is not already aggregated. - A stream of independent log records. In this case, the input table should either be a
stream table
or should only ever add rows (regardless of whether theattribute
is specified).
If other use cases are identified, a publication mode or extensible listener framework may be introduced at a later date.
- Parameters:
table
- The table used as a source of data to be sent to Kafka.kafkaProperties
- Properties to be passed to create the associated KafkaProducer.topic
- Kafka topic namekeySpec
- Conversion specification for Kafka record keys from table column data. If notIgnore
, must specify a key serializer that maps each input tuple to a unique output key.valueSpec
- Conversion specification for Kafka record values from table column data.lastByKeyColumns
- Whether to publish only the last record for each unique key. Ignored whenkeySpec
isIGNORE
. Otherwise, iflastByKeycolumns == true
this method will internally perform alastBy
aggregation ontable
grouped by the input columns ofkeySpec
and publish to Kafka from the result.- Returns:
- a callback to stop producing and shut down the associated table listener; note a caller should keep a reference to this return value to ensure liveliness.
- A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently
and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result
of a Deephaven aggregation, e.g.
-
partitionFilterFromArray
-
partitionToOffsetFromParallelArrays
public static IntToLongFunction partitionToOffsetFromParallelArrays(int[] partitions, long[] offsets) -
predicateFromSet
-