Ingesting Kafka Data with a Core+ Worker
The Deephaven Core+ Kafka ingestion framework provides several advantages over the existing Enterprise framework. Notably:
- The Core+ Kafka ingester can read Kafka streams both into in-memory tables and also persist them to disk using the Enterprise DIS integration described on this page.
- Key and Value specifications are disjoint, which is an improvement over the
io.deephaven.kafka.ingest.ConsumerRecordToTableWriterAdapter
pattern found in Enterprise. - The Core+ KafkaIngester uses chunks for improved efficiency compared to row-oriented Enterprise adapters.
Setting up the ingester
The first steps for setting up a Core+ Kafka ingester (such as defining a schema and updating the data routing configuration) are identical to configuring a legacy Enterprise ingester, but the syntax for the ingestion script is different.
You may find it convenient to test ingestion by consuming the Kafka stream to an in-memory table at first. This allows you to tune your key and value specifications before creating a schema, by following the instructions in the Deephaven Core documentation. After determining the desired properties and key/value specifications and examining the in-memory table, you can proceed to persistent ingestion. You can also use the in-memory table's definition as a starting point for creating an Enterprise schema. You must edit the schema produced by this example to include a partitioning column.
Below is an example of creating a schema from a preexisting in-memory table — in this example, the table is stored as the kafka_result
variable.
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import io.deephaven.enterprise.compatibility.TableDefinitionCompatibility
import static io.deephaven.shadow.enterprise.com.illumon.iris.db.tables.TableDefinition.STORAGETYPE_NESTEDPARTITIONEDONDISK
// this assumes you have a table named "kafka_result"
edef = TableDefinitionCompatibility.convertToEnterprise(kafka_result.getDefinition())
edef.setNamespace("MyNamespace")
edef.setName("MyTable")
edef.setStorageType(STORAGETYPE_NESTEDPARTITIONEDONDISK)
schema=io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.xml.SchemaXmlFactory.getXmlSchema(edef, io.deephaven.shadow.enterprise.com.illumon.iris.db.schema.NamespaceSet.SYSTEM)
Files.write(new File("/tmp/MyNamespace.MyTable.schema").toPath(), [io.deephaven.shadow.enterprise.io.deephaven.dhservice.SchemaFormatUtil.asString(schema)], StandardOpenOption.CREATE_NEW)
Prerequisites
Before creating the import script, follow the instructions to define the ingester and create the table schema:
Creating an import script
The import script that starts the ingester can be tested/developed in a Code Studio using a merge worker. When run for production use, the script should be run as a "Live Query - Merge Server" persistent query. In either case, it needs to run in a merge worker so that it has permission to write data to the import server's filesystem.
The import script handles:
- Configuring and starting the in-worker DIS that will use the storage and properties defined in the routing YAML.
- Setting properties for the Kafka consumer and starting the consumer.
- Configuring the key and value specifications.
The Kafka consumer used by the import script is an org.apache.kafka.clients.consumer.KafkaConsumer
. Some of the properties that affect the KafkaConsumer
's operation are shown in the examples here, but there are many more that relate to such cases as two-way TLS, authentication with a schema registry, and others. Please refer to the Apache documentation for KafkaConsumer
for details and examples of other properties that can be configured.
The first time a Deephaven ingester connects to a topic, it will request to start receiving records from offset 0
— i.e., the broker's oldest records for this topic.
As the in-worker DIS receives and processes Kafka records, it delivers the table row data to Deephaven clients that have subscribed for updates, and it also, in parallel, flushes and checkpoints row data to disk. When a checkpoint is durably committed, the ingester sends a checkpoint notification to the Kafka broker to update the latest offset that has been delivered. In this way, when an ingester is restarted, intentionally or unintentionally, it can pick up exactly where it left off in receiving records from the topic. On startup, the ingester's default behavior is to attempt to resume from the last ingested record, ensuring exactly-once delivery. This behavior can be further controlled by the KafkaTableWriter Options.
KafkaConsumer Properties
You must create a Properties
object for the KafkaConsumer
. Persistent ingestion requires that auto-commit is disabled to ensure exactly-once delivery by setting the enable.auto.commit
property to false
. This setting allows the in-worker DIS, which is actually writing rows to disk, to also manage commits back to the Kafka broker. The broker will associate the offset for the ingester based on the supplied consumer group (props.put("group.id", "dhdis")
). Any not-already-used name can be used for the Deephaven consumer group, but it may be necessary to update Kafka permissions on the broker side to allow the Deephaven consumer to manage its own consumer groups.
Below is an example of how to create a typical Properties
object for a KafkaConsumer
:
final Properties props = new Properties()
props.put('bootstrap.servers', 'http://kafka-broker:9092')
props.put('schema.registry.url', 'http://kafka-broker:8081')
props.put("fetch.min.bytes", "65000")
props.put("fetch.max.wait.ms", "200")
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
KafkaTableWriter Options
After creating suitable properties, the next step is creating an Options
builder object for the ingestion and passing it to the KafkaTableWriter.consumeToDis
function. The KafkaTableWriter.Options Javadoc provides a complete list of options.
Example Options
for both fixed partitions and dynamic partitions are provided below.
Specifying record type and columns
For JSON formatted Kafka records, a valueSpec
is needed to indicate names and types of columns to map from the record to the target table. Note the import of ColumnDefinition
for the ofType specification methods.
import io.deephaven.engine.table.ColumnDefinition
// .... //
opts.keySpec(io.deephaven.kafka.KafkaTools.Consume.IGNORE)
opts.valueSpec(io.deephaven.kafka.KafkaTools.Consume.jsonSpec(
ColumnDefinition.ofString('Sym'),
ColumnDefinition.ofLong('AskSize'),
ColumnDefinition.ofDouble('AskPrice'),
ColumnDefinition.ofLong('BidSize'),
ColumnDefinition.ofDouble('BidPrice'),
ColumnDefinition.ofString('AskExchange'),
ColumnDefinition.ofString('BidExchange'),
ColumnDefinition.ofLong('AskTime'),
ColumnDefinition.ofLong('BidTime')))
Consume
also provides record specification methods for Avro, protobuf, and raw Kafka records.
Specifying the Data Import Server
The Options
structure determines which of the Data Import Server configurations is used. You can either configure a Data Import server in the routing.yaml
file , or you can use dhconfig dis to create an ad-hoc DIS.
You can specify the Data Import Server in three ways:
- Pass in the name of a Data Import Server using
disName
. The referenced DIS must have storage defined in the routing configuration. - Pass in the name of a Data Import Server and the path to its storage root using
disNameWithStorage
. The referenced DIS must have "private" storage defined in the routing configuration. - Pass in a
DataImportServer
object typically retrieved withDataImportServerTools
. If you are using the DataImportServer for multiple ingesters, then you must choose this method.
LastBy Tables
Often queries can be well-served by having the most recent (last) row for a given key immediately available. When dealing with large tables, such as a table of quotes, initializing the state for a lastBy
requires reading the entire table. Particularly at the end of the day, this can take significant time. A Deephaven ingestion server can maintain an in-memory view of the last row for each table, with the last-row state persisted to the checkpoint record. The in-memory table can then be shared with other workers over Barrage. To enable a lastBy
view of a Kafka table, invoke the .lastBy(KeyColumns)
function on the Options object. The method for exporting lastBy tables from a Kafka ingester is the same as that for a Core+ binary log ingester.
For example, this adds a lastBy view for the instrument
column to the ingester:
opts.lastBy("instrument")
The Options
structure lets you create multiple lastBy views, each with a different name by specifying .lastBy
multiple times.
After calling the consumeToDis
method, create a lastBy view of the table in memory with the LastByPartitionedTableFactory.forDataImportServer
method. The resulting table can then be retrieved in other workers via Barrage or examined in the Deephaven web UI.
import io.deephaven.enterprise.lastbystate.LastByPartitionedTableFactory
lbf = LastByPartitionedTableFactory.forDataImportServer(dis)
// resultPartitionedTable contains one constituent for each internal partition; you must specify a column partition (in this example `today()`)
resultPartitionedTable = lbf.createPartitionedTable("Namespace", "TableName", today())
// merging the partitioned table produces a table that can be exported via Barrage
resultTable = resultPartitionedTable.merge()
Function Transformations
Before writing the stream received from Kafka to disk, you may need to transform the received rows. For example, the Kafka stream might:
- Require a timestamp with a custom format to be converted to a Java Instant.
- Create a composite column from several other columns.
- Split a column into more meaningful fields.
- Rename columns.
To express these transformations, you must provide a Function from one Table to another with the transformation
Option
.
In the following example, the Instant
column is created from the Timestamp
column and the column named NegateBool
inverts the BoolValue
column. We use a Groovy closure named xf
as the implementation of the function.
xf = { res -> res.updateView("Instant=io.deephaven.shadow.enterprise.com.illumon.iris.db.tables.utils.DBTimeUtils.convertDateTime(Timestamp).getInstant()", "NegateBool=!BoolValue") }
opts.transformation(xf)
The function must not change the number of rows in the table, and the values in the offset column must not be changed by the transformation.
When using dynamic partitions, the transformation function is applied before the partition for a row is determined so the resulting value can be used as input to the partition function.
Deephaven table partitions
Deephaven tables are partitioned in two ways. The first type of partitioning is that each table has a column partition, which is user-visible and can be used from queries. (Typically, the column partition is a "Date" column.) The second type of partitioning is an internal partition. The internal partition is not generally visible to users, and represents a single stream of data. For tables ingested from Kafka, the Deephaven internal partitions are automatically assigned with the format <topic>-<numeric kafka partition>
.
The Kafka ingester supports two ways of determining column partitions:
- Fixed partitions are assigned for the life of the ingester.
- Dynamic partitions are determined as a function of the data.
Fixed partitions
To use a fixed partition, call the Options.partitionValue
function. On startup:
- If the column partition already has data ingested, then the checkpoint record provides the offset of the next message. This step can be skipped by calling
Options.ignoreOffsetFromCheckpoints(true)
when creating the ingestion options. - If
Options.resumeFrom
is specified, the prior partition is determined by invoking theresumeFrom
function with the current column partition. If a matching internal partition for the previous column partition is found with a checkpoint record, then the ingestion is resumed from that offset. - The Kafka broker is queried for committed values. If there is a committed value, ingestion is resumed from that offset. This step can be skipped by calling
Options.ignoreOffsetFromBroker(true)
when creating the ingestion options. - Finally, the fallback function specified by
Options.partitionToInitialOffsetFallback
is called, and ingestion is resumed from the returned offset. The fallback function defaults to reading all data from the beginning of the topic, but can be configured to ingest only new data (or an arbitrary offset by providing your own function).
The following example ingestion script uses a long
key and avro
message from a schema registry and writes to the MyNamespace.MyTable
partition for today()
.
import io.deephaven.kafka.KafkaTools
import io.deephaven.enterprise.kafkawriter.KafkaTableWriter
final Properties props = new Properties()
props.put('bootstrap.servers', 'http://kafka-broker:9092')
props.put('schema.registry.url', 'http://kafka-broker:8081')
props.put("fetch.min.bytes", "65000")
props.put("fetch.max.wait.ms", "200")
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
// note that these properties control the behavior of the FROM_PROPERTIES keySpec
props.put("deephaven.key.column.name", "Key")
props.put("deephaven.key.column.type", "long")
final KafkaTableWriter.Options opts = new io.deephaven.enterprise.kafkawriter.KafkaTableWriter.Options()
opts.disName("KafkaIngester")
opts.namespace("MyNamespace")
opts.tableName("MyTable")
opts.topic("demo-topic")
opts.kafkaProperties(props)
opts.keySpec(io.deephaven.kafka.KafkaTools.FROM_PROPERTIES)
opts.valueSpec(io.deephaven.kafka.KafkaTools.Consume.avroSpec("demo-value"))
// Configure fixed partitioning:
opts.partitionValue(today())
KafkaTableWriter.consumeToDis(opts)
Dynamic partitions
Dynamic partitions determine the column partition from a long
column in the input data and a function passed to the Options.dynamicPartitionFunction
method. This is suitable, for example, for turning a KafkaTimestamp column into a daily or hourly column partition value.
As the ingester starts up, it determines the Kafka offset to resume from using the following process:
- Existing column partitions for each internal partition are enumerated and sorted in lexicographical order. The most recent (i.e., highest sorted value) column partition's checkpoint record is read, and ingestion resumes from the Kafka offset found in that checkpoint record. This step can be skipped by calling
Options.ignoreOffsetFromCheckpoints(true)
when creating the ingestion options. - The Kafka broker is queried for committed values. If there is a committed value, ingestion is resumed from that offset. This step can be skipped by calling
Options.ignoreOffsetFromBroker(true)
when creating the ingestion options. - Finally, the fallback function specified by
Options.partitionToInitialOffsetFallback
is called, and ingestion is resumed from the returned offset. The fallback function defaults to reading all data from the beginning of the topic, but can be configured to ingest only new data (or an arbitrary offset by providing your own function).
When messages are consumed from the broker and parsed into Deephaven table rows, the column partition is computed for each row. Only one column partition is active at any time. When a row with a new column partition is ingested, the active column partition is durably committed to disk and the new column partition is opened. Because the existing partition is committed before the next partition is written (and on startup, the partitions are sorted in lexicographical order), the output of the dynamic partition function should be non-descending to avoid thrashing partitions or possibly missing messages on startup.
The TimePartitionRotation
class provides convenience functions to map nanos since the epoch to either daily or hourly partitions. By default, a "slack" parameter is set to 30 days — times that are more than 30 days into the past or future are considered invalid and an exception is raised from the consumer.
Ingestion scripts for dynamic partitioning are identical to fixed partitioning, but instead of a partitionValue
the dynamicPartitionFunction
option is specified. In this example, the KafkaTimestamp column is used to create daily partitions using the London time zone. Timestamps that are more than one week (168 hours) are considered a data error.
import io.deephaven.enterprise.kafkawriter.TimePartitionRotation;
import java.time.ZoneId;
// Configure dynamic partitioning:
opts.dynamicPartitionFunction("KafkaTimestamp", TimePartitionRotation.daily(ZoneId.of("Europe/London"), 7 * 24))