Class KafkaTools

java.lang.Object
io.deephaven.kafka.KafkaTools

public class KafkaTools extends Object
  • Field Details

    • KAFKA_PARTITION_COLUMN_NAME_PROPERTY

      public static final String KAFKA_PARTITION_COLUMN_NAME_PROPERTY
      See Also:
    • KAFKA_PARTITION_COLUMN_NAME_DEFAULT

      public static final String KAFKA_PARTITION_COLUMN_NAME_DEFAULT
      See Also:
    • OFFSET_COLUMN_NAME_PROPERTY

      public static final String OFFSET_COLUMN_NAME_PROPERTY
      See Also:
    • OFFSET_COLUMN_NAME_DEFAULT

      public static final String OFFSET_COLUMN_NAME_DEFAULT
      See Also:
    • TIMESTAMP_COLUMN_NAME_PROPERTY

      public static final String TIMESTAMP_COLUMN_NAME_PROPERTY
      See Also:
    • TIMESTAMP_COLUMN_NAME_DEFAULT

      public static final String TIMESTAMP_COLUMN_NAME_DEFAULT
      See Also:
    • RECEIVE_TIME_COLUMN_NAME_PROPERTY

      public static final String RECEIVE_TIME_COLUMN_NAME_PROPERTY
      See Also:
    • RECEIVE_TIME_COLUMN_NAME_DEFAULT

      public static final String RECEIVE_TIME_COLUMN_NAME_DEFAULT
    • KEY_BYTES_COLUMN_NAME_PROPERTY

      public static final String KEY_BYTES_COLUMN_NAME_PROPERTY
      See Also:
    • KEY_BYTES_COLUMN_NAME_DEFAULT

      public static final String KEY_BYTES_COLUMN_NAME_DEFAULT
    • VALUE_BYTES_COLUMN_NAME_PROPERTY

      public static final String VALUE_BYTES_COLUMN_NAME_PROPERTY
      See Also:
    • VALUE_BYTES_COLUMN_NAME_DEFAULT

      public static final String VALUE_BYTES_COLUMN_NAME_DEFAULT
    • KEY_COLUMN_NAME_PROPERTY

      public static final String KEY_COLUMN_NAME_PROPERTY
      See Also:
    • KEY_COLUMN_NAME_DEFAULT

      public static final String KEY_COLUMN_NAME_DEFAULT
      See Also:
    • VALUE_COLUMN_NAME_PROPERTY

      public static final String VALUE_COLUMN_NAME_PROPERTY
      See Also:
    • VALUE_COLUMN_NAME_DEFAULT

      public static final String VALUE_COLUMN_NAME_DEFAULT
      See Also:
    • KEY_COLUMN_TYPE_PROPERTY

      public static final String KEY_COLUMN_TYPE_PROPERTY
      See Also:
    • VALUE_COLUMN_TYPE_PROPERTY

      public static final String VALUE_COLUMN_TYPE_PROPERTY
      See Also:
    • SCHEMA_SERVER_PROPERTY

      public static final String SCHEMA_SERVER_PROPERTY
      See Also:
    • SHORT_DESERIALIZER

      public static final String SHORT_DESERIALIZER
    • INT_DESERIALIZER

      public static final String INT_DESERIALIZER
    • LONG_DESERIALIZER

      public static final String LONG_DESERIALIZER
    • FLOAT_DESERIALIZER

      public static final String FLOAT_DESERIALIZER
    • DOUBLE_DESERIALIZER

      public static final String DOUBLE_DESERIALIZER
    • BYTE_ARRAY_DESERIALIZER

      public static final String BYTE_ARRAY_DESERIALIZER
    • STRING_DESERIALIZER

      public static final String STRING_DESERIALIZER
    • BYTE_BUFFER_DESERIALIZER

      public static final String BYTE_BUFFER_DESERIALIZER
    • AVRO_DESERIALIZER

      public static final String AVRO_DESERIALIZER
    • DESERIALIZER_FOR_IGNORE

      public static final String DESERIALIZER_FOR_IGNORE
    • SHORT_SERIALIZER

      public static final String SHORT_SERIALIZER
    • INT_SERIALIZER

      public static final String INT_SERIALIZER
    • LONG_SERIALIZER

      public static final String LONG_SERIALIZER
    • FLOAT_SERIALIZER

      public static final String FLOAT_SERIALIZER
    • DOUBLE_SERIALIZER

      public static final String DOUBLE_SERIALIZER
    • BYTE_ARRAY_SERIALIZER

      public static final String BYTE_ARRAY_SERIALIZER
    • STRING_SERIALIZER

      public static final String STRING_SERIALIZER
    • BYTE_BUFFER_SERIALIZER

      public static final String BYTE_BUFFER_SERIALIZER
    • AVRO_SERIALIZER

      public static final String AVRO_SERIALIZER
    • SERIALIZER_FOR_IGNORE

      public static final String SERIALIZER_FOR_IGNORE
    • NESTED_FIELD_NAME_SEPARATOR

      public static final String NESTED_FIELD_NAME_SEPARATOR
      See Also:
    • NESTED_FIELD_COLUMN_NAME_SEPARATOR

      public static final String NESTED_FIELD_COLUMN_NAME_SEPARATOR
      See Also:
    • AVRO_LATEST_VERSION

      public static final String AVRO_LATEST_VERSION
      See Also:
    • SEEK_TO_BEGINNING

      public static final long SEEK_TO_BEGINNING
      See Also:
    • DONT_SEEK

      public static final long DONT_SEEK
      See Also:
    • SEEK_TO_END

      public static final long SEEK_TO_END
      See Also:
    • ALL_PARTITIONS

      public static final IntPredicate ALL_PARTITIONS
    • ALL_PARTITIONS_SEEK_TO_BEGINNING

      public static final IntToLongFunction ALL_PARTITIONS_SEEK_TO_BEGINNING
    • ALL_PARTITIONS_DONT_SEEK

      public static final IntToLongFunction ALL_PARTITIONS_DONT_SEEK
    • ALL_PARTITIONS_SEEK_TO_END

      public static final IntToLongFunction ALL_PARTITIONS_SEEK_TO_END
    • DIRECT_MAPPING

      public static final Function<String,String> DIRECT_MAPPING
    • FROM_PROPERTIES

      public static final KafkaTools.Consume.KeyOrValueSpec FROM_PROPERTIES
      The names for the key or value columns can be provided in the properties as "deephaven.key.column.name" or "deephaven.value.column.name", and otherwise default to "KafkaKey" or "KafkaValue". The types for key or value are either specified in the properties as "deephaven.key.column.type" or "deephaven.value.column.type" or deduced from the serializer classes for "key.deserializer" or "value.deserializer" in the provided Properties object.
  • Constructor Details

    • KafkaTools

      public KafkaTools()
  • Method Details

    • getAvroSchema

      public static org.apache.avro.Schema getAvroSchema(String avroSchemaAsJsonString)
      Create an Avro schema object for a String containing a JSON encoded Avro schema definition.
      Parameters:
      avroSchemaAsJsonString - The JSON Avro schema definition
      Returns:
      an Avro schema object
    • columnDefinitionsToAvroSchema

      public static org.apache.avro.Schema columnDefinitionsToAvroSchema(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, org.apache.commons.lang3.mutable.MutableObject<Properties> colPropsOut)
    • avroSchemaToColumnDefinitions

      public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Map<String,String> fieldPathToColumnNameOut, org.apache.avro.Schema schema, Function<String,String> requestedFieldPathToColumnName)
    • avroSchemaToColumnDefinitions

      public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Map<String,String> fieldPathToColumnNameOut, org.apache.avro.Schema schema, Function<String,String> requestedFieldPathToColumnName, boolean useUTF8Strings)
    • avroSchemaToColumnDefinitions

      public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema, Function<String,String> requestedFieldPathToColumnName)
      Convert an Avro schema to a list of column definitions.
      Parameters:
      columnsOut - Column definitions for output; should be empty on entry.
      schema - Avro schema
      requestedFieldPathToColumnName - An optional mapping to specify selection and naming of columns from Avro fields, or null for map all fields using field path for column name.
    • avroSchemaToColumnDefinitions

      public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema)
      Convert an Avro schema to a list of column definitions, mapping every avro field to a column of the same name.
      Parameters:
      columnsOut - Column definitions for output; should be empty on entry.
      schema - Avro schema
    • friendlyNameToTableType

      @ScriptApi public static KafkaTools.TableType friendlyNameToTableType(@NotNull @NotNull String typeName)
      Map "Python-friendly" table type name to a KafkaTools.TableType. Supported values are:
      1. "blink"
      2. "stream" (deprecated; use "blink")
      3. "append"
      4. "ring:<capacity>" where capacity is a integer number specifying the maximum number of trailing rows to include in the result
      Parameters:
      typeName - The friendly name
      Returns:
      The mapped KafkaTools.TableType
    • consumeToTable

      public static Table consumeToTable(@NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull @NotNull IntPredicate partitionFilter, @NotNull @NotNull IntToLongFunction partitionToInitialOffset, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull @NotNull KafkaTools.TableType tableType)
      Consume from Kafka to a Deephaven Table.
      Parameters:
      kafkaProperties - Properties to configure the result and also to be passed to create the KafkaConsumer
      topic - Kafka topic name
      partitionFilter - A predicate returning true for the partitions to consume. The convenience constant ALL_PARTITIONS is defined to facilitate requesting all partitions.
      partitionToInitialOffset - A function specifying the desired initial offset for each partition consumed
      keySpec - Conversion specification for Kafka record keys
      valueSpec - Conversion specification for Kafka record values
      tableType - KafkaTools.TableType specifying the type of the expected result
      Returns:
      The result Table containing Kafka stream data formatted according to tableType
    • consumeToPartitionedTable

      public static PartitionedTable consumeToPartitionedTable(@NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull @NotNull IntPredicate partitionFilter, @NotNull @NotNull IntToLongFunction partitionToInitialOffset, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull @NotNull KafkaTools.TableType tableType)
      Consume from Kafka to a Deephaven PartitionedTable containing one constituent Table per partition.
      Parameters:
      kafkaProperties - Properties to configure the result and also to be passed to create the KafkaConsumer
      topic - Kafka topic name
      partitionFilter - A predicate returning true for the partitions to consume. The convenience constant ALL_PARTITIONS is defined to facilitate requesting all partitions.
      partitionToInitialOffset - A function specifying the desired initial offset for each partition consumed
      keySpec - Conversion specification for Kafka record keys
      valueSpec - Conversion specification for Kafka record values
      tableType - KafkaTools.TableType specifying the type of the expected result's constituent tables
      Returns:
      The result PartitionedTable containing Kafka stream data formatted according to tableType
    • getTableDefinition

      public static TableDefinition getTableDefinition(@NotNull @NotNull Properties kafkaProperties, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec)
      Construct a TableDefinition based on the input Properties and KafkaTools.Consume.KeyOrValueSpec parameters. Given the same input Properties and Consume.KeyOrValueSpec parameters, the returned TableDefinition is the same as the TableDefinition of the table produced by consumeToTable(Properties, String, IntPredicate, IntToLongFunction, Consume.KeyOrValueSpec, Consume.KeyOrValueSpec, TableType)
      Parameters:
      kafkaProperties - Properties to configure this table
      keySpec - Conversion specification for Kafka record keys
      valueSpec - Conversion specification for Kafka record values
      Returns:
      A TableDefinition derived from the input Properties and KeyOrValueSpec instances
    • consume

      public static void consume(@NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull @NotNull IntPredicate partitionFilter, @NotNull @NotNull KafkaTools.InitialOffsetLookup partitionToInitialOffset, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull @NotNull KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, @Nullable @Nullable KafkaTools.ConsumerLoopCallback consumerLoopCallback)
      Consume from Kafka to stream consumers supplied by streamConsumerRegistrar.
      Parameters:
      kafkaProperties - Properties to configure this table and also to be passed to create the KafkaConsumer
      topic - Kafka topic name
      partitionFilter - A predicate returning true for the partitions to consume. The convenience constant ALL_PARTITIONS is defined to facilitate requesting all partitions.
      partitionToInitialOffset - A function specifying the desired initial offset for each partition consumed
      keySpec - Conversion specification for Kafka record keys
      valueSpec - Conversion specification for Kafka record values
      streamConsumerRegistrarProvider - A provider for a function to register StreamConsumer instances. The registered stream consumers must accept chunk types that correspond to StreamChunkUtils.chunkTypeForColumnIndex(TableDefinition, int) for the supplied TableDefinition. See single and per-partition.
      consumerLoopCallback - callback to inject logic into the ingester's consumer loop
    • produceFromTable

      public static Runnable produceFromTable(@NotNull @NotNull Table table, @NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull KafkaTools.Produce.KeyOrValueSpec keySpec, @NotNull KafkaTools.Produce.KeyOrValueSpec valueSpec, boolean lastByKeyColumns)
      Produce a Kafka stream from a Deephaven table.

      Note that table must only change in ways that are meaningful when turned into a stream of events over Kafka.

      Two primary use cases are considered:

      1. A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result of a Deephaven aggregation, e.g. TableOperations.aggAllBy(io.deephaven.api.agg.spec.AggSpec), TableOperations.aggBy(io.deephaven.api.agg.Aggregation), or TableOperations.lastBy(). This means that key columns (as specified by keySpec) must not be modified, and no rows should be shifted if there are any key columns. Note that specifying lastByKeyColumns=true can make it easy to satisfy this constraint if the input data is not already aggregated.
      2. A stream of independent log records. In this case, the input table should either be a blink table or should only ever add rows (regardless of whether the attribute is specified).

      If other use cases are identified, a publication mode or extensible listener framework may be introduced at a later date.

      Parameters:
      table - The table used as a source of data to be sent to Kafka.
      kafkaProperties - Properties to be passed to create the associated KafkaProducer.
      topic - Kafka topic name
      keySpec - Conversion specification for Kafka record keys from table column data.
      valueSpec - Conversion specification for Kafka record values from table column data.
      lastByKeyColumns - Whether to publish only the last record for each unique key. Ignored when keySpec is IGNORE. Otherwise, if lastByKeycolumns == true this method will internally perform a lastBy aggregation on table grouped by the input columns of keySpec and publish to Kafka from the result.
      Returns:
      a callback to stop producing and shut down the associated table listener; note a caller should keep a reference to this return value to ensure liveliness.
      See Also:
    • produceFromTable

      public static Runnable produceFromTable(KafkaPublishOptions options)
      Produce a Kafka stream from a Deephaven table.

      Note that table must only change in ways that are meaningful when turned into a stream of events over Kafka.

      Two primary use cases are considered:

      1. A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result of a Deephaven aggregation, e.g. TableOperations.aggAllBy(io.deephaven.api.agg.spec.AggSpec), TableOperations.aggBy(io.deephaven.api.agg.Aggregation), or TableOperations.lastBy(). This means that key columns (as specified by keySpec) must not be modified, and no rows should be shifted if there are any key columns. Note that specifying lastByKeyColumns=true can make it easy to satisfy this constraint if the input data is not already aggregated.
      2. A stream of independent log records. In this case, the input table should either be a blink table or should only ever add rows (regardless of whether the attribute is specified).

      If other use cases are identified, a publication mode or extensible listener framework may be introduced at a later date.

      Parameters:
      options - the options
      Returns:
      a callback to stop producing and shut down the associated table listener; note a caller should keep a reference to this return value to ensure liveliness.
    • partitionFilterFromArray

      public static IntPredicate partitionFilterFromArray(int[] partitions)
    • partitionToOffsetFromParallelArrays

      public static IntToLongFunction partitionToOffsetFromParallelArrays(int[] partitions, long[] offsets)
    • predicateFromSet

      public static Predicate<String> predicateFromSet(Set<String> set)
    • topics

      public static Set<String> topics(@NotNull @NotNull Properties kafkaProperties)
    • listTopics

      public static String[] listTopics(@NotNull @NotNull Properties kafkaProperties)