Class KafkaTools

java.lang.Object
io.deephaven.kafka.KafkaTools

public class KafkaTools
extends Object
  • Field Details

    • KAFKA_PARTITION_COLUMN_NAME_PROPERTY

      public static final String KAFKA_PARTITION_COLUMN_NAME_PROPERTY
      See Also:
      Constant Field Values
    • KAFKA_PARTITION_COLUMN_NAME_DEFAULT

      public static final String KAFKA_PARTITION_COLUMN_NAME_DEFAULT
      See Also:
      Constant Field Values
    • OFFSET_COLUMN_NAME_PROPERTY

      public static final String OFFSET_COLUMN_NAME_PROPERTY
      See Also:
      Constant Field Values
    • OFFSET_COLUMN_NAME_DEFAULT

      public static final String OFFSET_COLUMN_NAME_DEFAULT
      See Also:
      Constant Field Values
    • TIMESTAMP_COLUMN_NAME_PROPERTY

      public static final String TIMESTAMP_COLUMN_NAME_PROPERTY
      See Also:
      Constant Field Values
    • TIMESTAMP_COLUMN_NAME_DEFAULT

      public static final String TIMESTAMP_COLUMN_NAME_DEFAULT
      See Also:
      Constant Field Values
    • KEY_COLUMN_NAME_PROPERTY

      public static final String KEY_COLUMN_NAME_PROPERTY
      See Also:
      Constant Field Values
    • KEY_COLUMN_NAME_DEFAULT

      public static final String KEY_COLUMN_NAME_DEFAULT
      See Also:
      Constant Field Values
    • VALUE_COLUMN_NAME_PROPERTY

      public static final String VALUE_COLUMN_NAME_PROPERTY
      See Also:
      Constant Field Values
    • VALUE_COLUMN_NAME_DEFAULT

      public static final String VALUE_COLUMN_NAME_DEFAULT
      See Also:
      Constant Field Values
    • KEY_COLUMN_TYPE_PROPERTY

      public static final String KEY_COLUMN_TYPE_PROPERTY
      See Also:
      Constant Field Values
    • VALUE_COLUMN_TYPE_PROPERTY

      public static final String VALUE_COLUMN_TYPE_PROPERTY
      See Also:
      Constant Field Values
    • SCHEMA_SERVER_PROPERTY

      public static final String SCHEMA_SERVER_PROPERTY
      See Also:
      Constant Field Values
    • SHORT_DESERIALIZER

      public static final String SHORT_DESERIALIZER
    • INT_DESERIALIZER

      public static final String INT_DESERIALIZER
    • LONG_DESERIALIZER

      public static final String LONG_DESERIALIZER
    • FLOAT_DESERIALIZER

      public static final String FLOAT_DESERIALIZER
    • DOUBLE_DESERIALIZER

      public static final String DOUBLE_DESERIALIZER
    • BYTE_ARRAY_DESERIALIZER

      public static final String BYTE_ARRAY_DESERIALIZER
    • STRING_DESERIALIZER

      public static final String STRING_DESERIALIZER
    • BYTE_BUFFER_DESERIALIZER

      public static final String BYTE_BUFFER_DESERIALIZER
    • AVRO_DESERIALIZER

      public static final String AVRO_DESERIALIZER
    • DESERIALIZER_FOR_IGNORE

      public static final String DESERIALIZER_FOR_IGNORE
    • SHORT_SERIALIZER

      public static final String SHORT_SERIALIZER
    • INT_SERIALIZER

      public static final String INT_SERIALIZER
    • LONG_SERIALIZER

      public static final String LONG_SERIALIZER
    • FLOAT_SERIALIZER

      public static final String FLOAT_SERIALIZER
    • DOUBLE_SERIALIZER

      public static final String DOUBLE_SERIALIZER
    • BYTE_ARRAY_SERIALIZER

      public static final String BYTE_ARRAY_SERIALIZER
    • STRING_SERIALIZER

      public static final String STRING_SERIALIZER
    • BYTE_BUFFER_SERIALIZER

      public static final String BYTE_BUFFER_SERIALIZER
    • AVRO_SERIALIZER

      public static final String AVRO_SERIALIZER
    • SERIALIZER_FOR_IGNORE

      public static final String SERIALIZER_FOR_IGNORE
    • NESTED_FIELD_NAME_SEPARATOR

      public static final String NESTED_FIELD_NAME_SEPARATOR
      See Also:
      Constant Field Values
    • NESTED_FIELD_COLUMN_NAME_SEPARATOR

      public static final String NESTED_FIELD_COLUMN_NAME_SEPARATOR
      See Also:
      Constant Field Values
    • AVRO_LATEST_VERSION

      public static final String AVRO_LATEST_VERSION
      See Also:
      Constant Field Values
    • SEEK_TO_BEGINNING

      public static final long SEEK_TO_BEGINNING
    • DONT_SEEK

      public static final long DONT_SEEK
    • SEEK_TO_END

      public static final long SEEK_TO_END
    • ALL_PARTITIONS

      public static final IntPredicate ALL_PARTITIONS
    • ALL_PARTITIONS_SEEK_TO_BEGINNING

      public static final IntToLongFunction ALL_PARTITIONS_SEEK_TO_BEGINNING
    • ALL_PARTITIONS_DONT_SEEK

      public static final IntToLongFunction ALL_PARTITIONS_DONT_SEEK
    • ALL_PARTITIONS_SEEK_TO_END

      public static final IntToLongFunction ALL_PARTITIONS_SEEK_TO_END
    • DIRECT_MAPPING

      public static final Function<String,​String> DIRECT_MAPPING
    • FROM_PROPERTIES

      public static final io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec FROM_PROPERTIES
  • Constructor Details

    • KafkaTools

      public KafkaTools()
  • Method Details

    • getAvroSchema

      public static org.apache.avro.Schema getAvroSchema​(String avroSchemaAsJsonString)
      Create an Avro schema object for a String containing a JSON encoded Avro schema definition.
      Parameters:
      avroSchemaAsJsonString - The JSON Avro schema definition
      Returns:
      an Avro schema object
    • columnDefinitionsToAvroSchema

      public static org.apache.avro.Schema columnDefinitionsToAvroSchema​(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, org.apache.commons.lang3.mutable.MutableObject<Properties> colPropsOut)
    • avroSchemaToColumnDefinitions

      public static void avroSchemaToColumnDefinitions​(List<ColumnDefinition<?>> columnsOut, Map<String,​String> fieldPathToColumnNameOut, org.apache.avro.Schema schema, Function<String,​String> requestedFieldPathToColumnName)
    • avroSchemaToColumnDefinitions

      public static void avroSchemaToColumnDefinitions​(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema, Function<String,​String> requestedFieldPathToColumnName)
      Convert an Avro schema to a list of column definitions.
      Parameters:
      columnsOut - Column definitions for output; should be empty on entry.
      schema - Avro schema
      requestedFieldPathToColumnName - An optional mapping to specify selection and naming of columns from Avro fields, or null for map all fields using field path for column name.
    • avroSchemaToColumnDefinitions

      public static void avroSchemaToColumnDefinitions​(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema)
      Convert an Avro schema to a list of column definitions, mapping every avro field to a column of the same name.
      Parameters:
      columnsOut - Column definitions for output; should be empty on entry.
      schema - Avro schema
    • friendlyNameToTableType

      @ScriptApi public static KafkaTools.TableType friendlyNameToTableType​(@NotNull String typeName)
      Map "Python-friendly" table type name to a KafkaTools.TableType. Supported values are:
      1. "stream"
      2. "append"
      3. "ring:<capacity>" where capacity is a integer number specifying the maximum number of trailing rows to include in the result
      Parameters:
      typeName - The friendly name
      Returns:
      The mapped KafkaTools.TableType
    • consumeToTable

      public static Table consumeToTable​(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.TableType tableType)
      Consume from Kafka to a Deephaven Table.
      Parameters:
      kafkaProperties - Properties to configure the result and also to be passed to create the KafkaConsumer
      topic - Kafka topic name
      partitionFilter - A predicate returning true for the partitions to consume. The convenience constant ALL_PARTITIONS is defined to facilitate requesting all partitions.
      partitionToInitialOffset - A function specifying the desired initial offset for each partition consumed
      keySpec - Conversion specification for Kafka record keys
      valueSpec - Conversion specification for Kafka record values
      tableType - KafkaTools.TableType specifying the type of the expected result
      Returns:
      The result Table containing Kafka stream data formatted according to tableType
    • consumeToPartitionedTable

      public static PartitionedTable consumeToPartitionedTable​(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.TableType tableType)
      Consume from Kafka to a Deephaven PartitionedTable containing one constituent Table per partition.
      Parameters:
      kafkaProperties - Properties to configure the result and also to be passed to create the KafkaConsumer
      topic - Kafka topic name
      partitionFilter - A predicate returning true for the partitions to consume. The convenience constant ALL_PARTITIONS is defined to facilitate requesting all partitions.
      partitionToInitialOffset - A function specifying the desired initial offset for each partition consumed
      keySpec - Conversion specification for Kafka record keys
      valueSpec - Conversion specification for Kafka record values
      tableType - KafkaTools.TableType specifying the type of the expected result's constituent tables
      Returns:
      The result PartitionedTable containing Kafka stream data formatted according to tableType
    • consumeToResult

      public static <RESULT_TYPE> RESULT_TYPE consumeToResult​(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.TableType tableType, @NotNull io.deephaven.kafka.KafkaTools.ResultFactory<RESULT_TYPE> resultFactory)
      Consume from Kafka to a result Table or PartitionedTable.
      Parameters:
      kafkaProperties - Properties to configure this table and also to be passed to create the KafkaConsumer
      topic - Kafka topic name
      partitionFilter - A predicate returning true for the partitions to consume. The convenience constant ALL_PARTITIONS is defined to facilitate requesting all partitions.
      partitionToInitialOffset - A function specifying the desired initial offset for each partition consumed
      keySpec - Conversion specification for Kafka record keys
      valueSpec - Conversion specification for Kafka record values
      tableType - KafkaTools.TableType specifying the type of tables used in the result
      Returns:
      The result table containing Kafka stream data formatted according to tableType
    • produceFromTable

      public static Runnable produceFromTable​(@NotNull Table table, @NotNull Properties kafkaProperties, @NotNull String topic, @NotNull io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec keySpec, @NotNull io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec valueSpec, boolean lastByKeyColumns)
      Produce a Kafka stream from a Deephaven table.

      Note that table must only change in ways that are meaningful when turned into a stream of events over Kafka.

      Two primary use cases are considered:

      1. A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result of a Deephaven aggregation, e.g. TableOperations.aggAllBy(io.deephaven.api.agg.spec.AggSpec), TableOperations.aggBy(io.deephaven.api.agg.Aggregation), or TableOperations.lastBy(). This means that key columns (as specified by keySpec) must not be modified, and no rows should be shifted if there are any key columns. Note that specifying lastByKeyColumns=true can make it easy to satisfy this constraint if the input data is not already aggregated.
      2. A stream of independent log records. In this case, the input table should either be a stream table or should only ever add rows (regardless of whether the attribute is specified).

      If other use cases are identified, a publication mode or extensible listener framework may be introduced at a later date.

      Parameters:
      table - The table used as a source of data to be sent to Kafka.
      kafkaProperties - Properties to be passed to create the associated KafkaProducer.
      topic - Kafka topic name
      keySpec - Conversion specification for Kafka record keys from table column data. If not Ignore, must specify a key serializer that maps each input tuple to a unique output key.
      valueSpec - Conversion specification for Kafka record values from table column data.
      lastByKeyColumns - Whether to publish only the last record for each unique key. Ignored when keySpec is IGNORE. Otherwise, if lastByKeycolumns == true this method will internally perform a lastBy aggregation on table grouped by the input columns of keySpec and publish to Kafka from the result.
      Returns:
      a callback to stop producing and shut down the associated table listener; note a caller should keep a reference to this return value to ensure liveliness.
    • partitionFilterFromArray

      public static IntPredicate partitionFilterFromArray​(int[] partitions)
    • partitionToOffsetFromParallelArrays

      public static IntToLongFunction partitionToOffsetFromParallelArrays​(int[] partitions, long[] offsets)
    • predicateFromSet

      public static Predicate<String> predicateFromSet​(Set<String> set)