Legacy Kafka integration
The Deephaven Kafka integration operates similarly to an Import-driven lastBy Query. A separate Data Import Server (DIS) instance is run within the script of a query. The Deephaven Kafka ingester subscribes to one or more Kafka topics and replicates them to streaming Deephaven tables. The query additionally serves those tables to other workers through a Table Data Protocol (TDP) connection as described by the routing configuration.
The Data Import Server stores the replicated Kafka offsets inside a Deephaven checkpoint record, which is updated atomically with the size of the replicated Deephaven table. This allows Deephaven to resume the Kafka stream and provide exactly-once replication of the Kafka stream even in the face of restarts. Note: a Kafka broker itself may provide different guarantees to its producers depending on the selected configuration (e.g., at-least-once delivery or best-effort delivery). Deephaven does not change those guarantees but rather preserves the same level of guarantee.
The simplest example of configuring a Deephaven Kafka ingester is a topic that does not have a schema defined, but rather deserializes objects that directly map to Deephaven columns. This is not the typical use case, but it is simple to get up and running quickly. The Apache Kafka quick start guide is one such example. If you are able to run the kafka-console-consumer.sh
tool, you can replicate the “test” topic to Deephaven.
The first step to importing Kafka data is to configure your storage location and data routing service. Each instance of a Data Import Server must have exclusive control over the directories that it is writing to. Deephaven recommends that each DIS has a unique directory using a path such as /db/dataImportServers/[DIS_Name]
.
For example, to create a directory for KafkaImport1
:
sudo mkdir -p /db/dataImportServers/KafkaImport1
sudo chown dbmerge.dbmergegrp /db/dataImportServers/KafkaImport1
The storage directory must exist before the ingester is started and before updating the data routing configuration that references it. When the script is run as a persistent query on a merge server (preferred for production use), the dbmerge account will need read and write permissions on that path. If the script will be run in a regular query or console, the dbquery user needs read and write permissions on that path.
Configure storage and data routing
The system data routing configuration needs to be changed so that data for the Kafka tables is sent to and read from the appropriate processes and locations.
See dhconfig routing
for help modifying the data routing file.
Configure a storage location
Add the location where the Kafka DIS process will store its intraday data. In this section, "KafkaImport1" is an arbitrary identifier, which will be referenced later in the configuration file. The dbRoot
path "/db/dataImportServers/KafkaImport1" is also arbitrary and not directly related to the identifier.
storage:
- name: default
dbRoot: /db
- name: KafkaImport1
dbRoot: /db/dataImportServers/KafkaImport1
Instead of specifying a storage location in the data routing file, you may instead use "private" storage and configure the location in your ingestion script.
Configure the new in-worker Data Import Server
Create a new entry in the dataImportServers
section. Add claims
for the tables that this DIS will handle. Deephaven recommends using dynamic endpoints for in-worker data import servers.
Note
See also: Add a Data Import Server to the data routing configuration
In this example, the KafkaImport1
DIS will handle all tables in the “Kafka” namespace.
dataImportServers
...
KafkaImport1:
endpoint:
serviceRegistry: registry
# Handle Kafka tables only
claims: {namespace: Kafka}
storage: KafkaImport1
Note
The tailer port is not needed for data ingestion from Kafka. However, the tailer service is also used to send commands to the DIS, so disabling the port also disables those commands. This control functionality will not be available if the tailer port is disabled.
See our Importing Data guide for instructions to delete data directly from disk.
Change the table data service configuration(s)
The tableDataServices section defines how data is routed. One or more entries in this section may need to be adjusted so that query and merge workers will source data appropriately. In this example, the table data cache proxy is configured in a way that does not require changes.
db_tdcp:
endpoint:
serviceRegistry: none
host: localhost
port: *default-tableDataCacheProxyPort
sources:
- name: dataImportServers
Configure Local Storage Routing
Local storage (reading directly from the disk where the process is running) is the default method for accessing historical tables' data files. It is also the default method when reading data to be merged from intraday to historical. The data routing table normally contains an entry called "local" that is used for these purposes. Since the in-worker DIS process used to consume Kafka topics has its own storage path, it needs its own local definition in the routing file. This can be combined with the original "local" or defined independently.
(local
includes original local
and KafkaImport1
):
tableDataServices:
...
kafka:
storage: KafkaImport1
local1:
storage: default
local:
sources:
- name: local1
- name: kafka
(local
is original local
, and KafkaImport1
is referred to by kafka
):
tableDataServices:
...
kafka:
storage: KafkaImport1
local:
storage: default
Any entries defined here will be available in the TableDataService
selection dialogs when creating merge jobs:
The above example corresponds to the routing entries which include original local (local1
) and kafka
under local
. If desired, tags can be used to restrict which TableDataService
entries are shown in the UI.
Note
Create a Schema
Each table you import must have a schema defined. You can create a Schema using the Schema Editor from the Deephaven Classic client. The schema below is suitable for the Kafka test topic from the quick start guide. However, you must create a schema with suitable data types for the records in your topic.
<Table name="Test" namespace="Kafka" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
<Partitions keyFormula="${autobalance_by_first_grouping_column}" />
<Column name="Date" dataType="String" columnType="Partitioning" />
<Column name="KafkaPartition" dataType="Int" />
<Column name="Offset" dataType="Long" />
<Column name="Timestamp" dataType="DateTime" />
<Column name="Key" dataType="String" columnType="Grouping" />
<Column name="Value" dataType="String" />
</Table>
Discovering a Deephaven Schema from an Avro Schema
Often Kafka topics use Apache Avro serialization. Avro schemas are defined using JSON, and Deephaven can generate a Deephaven XML schema from an Avro schema. The Avro schema is discovered using a SchemaDiscovery builder from a Deephaven Groovy console.
The simplest case is creating a schema with only the columnPartition()
option, which will take the namespace and table name from the namespace and name in the Avro schema:
import com.illumon.iris.db.schema.SchemaServiceFactory
import io.deephaven.kafka.ingest.SchemaDiscovery
// Note that .columnPartition(...) is required so that an in-worker DIS can write to the table
ad = SchemaDiscovery.avroFactory(new File("pageviews.avsc"))
.columnPartition("Date")
schema = ad.generateDeephavenSchema()
schemaService = SchemaServiceFactory.getDefault()
// Create the namespace if it doesn't already exist
schemaService.createNamespace("System", schema.getNamespace())
schemaService.addSchema(schema)
Caution
Note:
You must be a member of the group iris-schemamanagers
to manipulate System schemas.
The SchemaDiscovery object provides methods for setting the table name, namespace, behavior for unmappable types, and column names for the internal partition, offset, and timestamp ConsumerRecord fields. For example, to set the tablename and namespace:
Groovy
import io.deephaven.kafka.ingest.SchemaDiscovery
// Note that .columnPartition(...) is required so that an in-worker DIS can write to the table
ad = SchemaDiscovery.avroFactory(new File("pageviews.avsc"))
.columnPartition("Date")
.namespace("Kafka")
.tableName("PageViews")
schema = ad.generateDeephavenSchema()
// Nested Avro schemas may be used for SchemaDiscovery by including prerequisite Avro schemas in the parser
parser = new org.apache.avro.Schema.Parser()
positionSchema = parser.parse(new File("position.avsc"))
playerSchema = parser.parse(new File("player.avsc"))
nad = SchemaDiscovery.avroFactory(playerSchema)
.columnPartition("Date")
.unmappableBehavior(SchemaDiscovery.UnmappableBehavior.Ignore)
nestedSchema = nad.generateDeephavenSchema()
Note
See also: The Javadoc contains a complete description of the AvroDiscovery options.
Discovering a Deephaven Schema from a Protocol Buffer Descriptor
Similar to Avro, a Deephaven schema may be generated from a Google Protocol Buffer descriptor set. This schema detection does not currently handle inner protobufs, "repeated" fields, or map fields.
import io.deephaven.kafka.ingest.SchemaDiscovery
// Note that .columnPartition(...) is required so that an in-worker DIS can write to the table
pbd = SchemaDiscovery.protobufFactory("Person.desc")
.columnPartition("Date")
.namespace("Kafka")
.tableName("Person")
schema = pbd.generateDeephavenSchema()
Import Script components and ingester functionality
Note
The remainder of this document describes configuring a Kafka ingestion script using a Legacy worker. You may instead configure a Kafka ingestion script using a Core+ worker.
The import script can be tested/developed in a Code Studio or Console using a merge worker. When run for production use, it will be run as a "Live Query - Merge Server" persistent query. In either case, it needs to run in a merge worker so that it will have rights to write data to the import server's filesystem.
The import script will handle:
- configuring and starting the in-worker DIS that will use the storage and properties defined in the routing YAML
- setting properties for the Kafka consumer and starting the consumer
- creating and configuring a table writer adapter using a builder class
The Kafka consumer used by the import script is an org.apache.kafka.clients.consumer.KafkaConsumer
. Some of its properties are shown in the examples here, but there are many more which relate to such cases as two-way TLS, authentication with a schema service, and others. Please refer to the Apache documentation for this consumer for details and examples of other properties that can be configured.
Deephaven provides several table writer adapters. Table writer adapters provide the functionality of receiving a message from the Kafka topic and extracting field values from it which will then be written to Deephaven table columns. Depending on how the data is organized in the source topic, a table writer adapter may be able to directly and automatically map the values into Deephaven columns, or it may be necessary to add transformation functions to convert or interpret values between Kafka and Deephaven.
props.put("enable.auto.commit", "false")
is required when configuring the Kafka consumer for use by a Deephaven ingester. This setting allows the in-worker DIS, which is actually writing rows to disk, to also manage commits back to the Kafka broker. The first time a Deephaven ingester connects to a topic, it will request to start receiving records from offset 0 - i.e., the oldest records the broker has for this topic. The broker will associate the offset for the ingester based on the supplied consumer group (props.put("group.id", "dhdis")
). Any not-already-used name can be used for the Deephaven consumer group, but it may be necessary to update Kafka permissions on the broker side to allow the Deephaven consumer to manage its own consumer groups. As the in-worker DIS receives and processes Kafka records, it immediately delivers the table row data to Deephaven clients that have subscribed for updates, and it also, in parallel, flushes and checkpoints row data to disk. When a checkpoint is complete, the ingester sends a checkpoint notification to the Kafka broker to update the latest offset that has been delivered. In this way, when an ingester is restarted, whether intentionally or unintentionally, it can pick up exactly where it left off in receiving records from the topic.
The method, io.deephaven.kafka.ingest.ResumeImportFrom#resumeAtOffset
, can be used to have a new ingester start from a specified offset rather than 0, and can also be used to reset an ingester back to 0, if needed.
Individual columns which need special handling are added to the table writer adapter builder arguments. These are structured as Groovy closures, which get a column name and a Kafka record, and can then do conditional processing to return a value to be written into the Deephaven column.
// Add a field handler which will return null if a value doesn't
// exist in the current record, rather than throwing an exception.
.addColumnToValueFunction("maybe_null", record -> {
return record.hasField("maybe_null") ?
(String)(record.get("maybe_null")) : null;
})
// Add a field handler which will convert a long
// epoch offset value to a DBDateTime with a best
// guess as to what unit are used (milliseconds,
// microseconds, or nanoseconds).
.addColumnToValueFunction("date_time", record -> {
return record.hasField("date_time") ?
com.illumon.iris.db.tables.utils.DBTimeUtils.autoEpochToTime
(Long.parseLong(record.get("date_time"))) : null;
})
Create an Import Script
Create a new persistent query for your Kafka ingestion DIS. This example uses the Kafka string deserializer and a SimpleConsumerRecordToTableWriterAdapter
.
// Set up the kafka topic to match the topic that is being published.
// e.g. kafkaTopicName="quickstart-events"
// kafkaTopicName="test"
kafkaTopicName="quickstart-events"
// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester
import io.deephaven.kafka.ingest.SimpleConsumerRecordToTableWriterAdapter
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import com.illumon.iris.db.util.logging.IrisLogCreator
import com.illumon.iris.db.v2.routing.DataRoutingServiceFactory
// the props object is used to configure the Kafka consumer
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
// create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault()
// if the dis configuration specifies "private" storage, then you must provide a storage location, otherwise set this to null
storage = "/db/dataImportServers/KafkaImport1"
disName = "KafkaImport1"
dis = DataImportServer.getDataImportServer(null, disName, Configuration.getInstance(), db.getSchemaService(), routingService, storage)
dis.startInWorker()
ki = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Test", kafkaTopicName, SimpleConsumerRecordToTableWriterAdapter.makeFactory(null, "Offset", "Timestamp", "Key", "Value"))
ki.start()
// The table is now available from this and other workers
kt = db.i("Kafka", "Test").where("Date=currentDateNy()")
Note
When no information about which Kafka offset to read from is available, Deephaven begins reading at the beginning of each Kafka partition. You can configure your ingester to resume from the last offset that was ingested in a prior day (see the Javadoc for KafkaIngester. You may also manually copy the offset from one partition to another using functions available in the ResumeImportFrom
class.
Note
See also: For more information on usage of the KafkaIngester and SimpleConsumerRecordToTableWriterAdapter classes, see the Deephaven Javadoc.
Generic Record Adapter
Rather than a simple one-to-one mapping between Kafka keys and values and a Deephaven column typically each Kafka key or value will contain multiple fields that are mapped to individual Deephaven columns. One popular serialization format is Avro. When using Avro, it is possible to deserialize without code generation using GenericRecords or with code generation using SpecificRecords. SpecificRecords are a special case for using the POJO adapter described in the next section.
To deserialize Avro records, you will need to add the Confluent deserializer to the class path of your Deephaven worker (RemoteQueryProcessor) by placing them in /etc/sysconfig/illumon.d/java_lib
. These packages can be obtained from https://packages.confluent.io/maven/. The avro-serializer package is available at:
- https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/7.4.0/kafka-avro-serializer-7.4.0.jar
- https://packages.confluent.io/maven/io/confluent/kafka-schema-serializer/7.4.0/kafka-schema-serializer-7.4.0.jar
If your organization also uses the Confluent schema registry, you should additionally add the schema registry package from:
The schema registry client also has dependencies on:
- https://repo1.maven.org/maven2/com/google/guava/guava/31.1-jre/guava-31.1-jre.jar
- https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.1/failureaccess-1.0.1.jar
so these will also need to be placed in /etc/sysconfig/illumon.d/java_lib
if the schema registry client is being used.
In this example, we will use the pageview and users streams from the Confluent Quick Start using Community Components Guide to produce Avro data.
You can download the schema generated by the Confluent datagen plugin by identifying the schema ID for the latest versions:
curl --silent -X GET http://localhost:8081/subjects/users-value/versions/latest | jq -r '.schema' > /tmp/users.avsc
curl --silent -X GET http://localhost:8081/subjects/pageviews-value/versions/latest | jq -r '.schema' > /tmp/pageviews.avsc
Using the Avro discovery tool, we can generate the equivalent Deephaven schemas:
import com.illumon.iris.db.schema.SchemaServiceFactory
import io.deephaven.kafka.ingest.SchemaDiscovery
ad = SchemaDiscovery.avroFactory(new File("/tmp/pageviews.avsc"))
.columnPartition("Date")
.kafkaPartitionColumn("Partition")
.kafkaOffsetColumn("Offset")
.namespace("Kafka")
.tableName("Pageview")
SchemaServiceFactory.getDefault().addSchema(ad.generateDeephavenSchema())
ad = SchemaDiscovery.avroFactory(new File("/tmp/users.avsc"))
.columnPartition("Date")
.kafkaTimestampColumn("Timestamp")
.kafkaPartitionColumn("Partition")
.kafkaOffsetColumn("Offset")
.namespace("Kafka")
.tableName("Users")
SchemaServiceFactory.getDefault().addSchema(ad.generateDeephavenSchema())
We can verify that the schemas have been successfully deployed by using the getMeta()
operation on the new tables.
pvm = db.getTable("Kafka", "Pageview").getMeta()
um = db.getTable("Kafka", "Users").getMeta()
The following script will import the topics into the Kafka.Pageviews
and Kafka.Users
tables.
// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import com.illumon.iris.db.util.logging.IrisLogCreator
import com.illumon.iris.db.v2.routing.DataRoutingServiceFactory
// Confluent imports
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false)
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
props.put("schema.registry.url", "http://localhost:8081/")
routingService = DataRoutingServiceFactory.getDefault()
// if the dis configuration specifies "private" storage, then you must provide a storage location, otherwise set this to null
storage = "/db/dataImportServers/KafkaImport1"
disName = "KafkaImport1"
dis = DataImportServer.getDataImportServer(null, disName, Configuration.getInstance(), db.getSchemaService(), routingService, storage)
dis.startInWorker()
pvrca = new io.deephaven.kafka.ingest.GenericRecordConsumerRecordToTableWriterAdapter.Builder()
.offsetColumnName("Offset")
.kafkaPartitionColumnName("Partition")
.addColumnToValueField("userid", "userid")
.addColumnToValueField("pageid", "pageid")
.addColumnToValueField("viewtime", "viewtime")
.buildFactory()
kipv = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Pageview", "pageviews", pvrca)
kipv.start()
urca = new io.deephaven.kafka.ingest.GenericRecordConsumerRecordToTableWriterAdapter.Builder()
.offsetColumnName("Partition")
.offsetColumnName("Offset")
.timestampColumnName("Timestamp")
.autoValueMapping(true)
.buildFactory()
kiu = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Users", "users", urca)
kiu.start()
// the tables can be viewed from either the DIS worker or other workers
kp = db.i("Kafka", "Pageview").where("Date=currentDateNy()")
ku = db.i("Kafka", "Users").where("Date=currentDateNy()")
POJO Adapter
The io.deephaven.kafka.ingest.PojoConsumerRecordToTableWriterAdapter
converts a Java object to a Deephaven row using reflection and code generation. You must add your objects to the classpath of the Deephaven worker (RemoteQueryProcessor), by adding your jar to the /etc/sysconfig/illumon.d/java_lib
directory.
If you are using Avro Specific records, you must compile your Avro schema into Java class files, as described in the Avro documentation. Using the same Confluent Quick Start example, we can compile the Avro schema into Java classes using the downloaded Avro tools jar:
java -jar avro-tools-1.9.2.jar compile schema /tmp/users.avsc /tmp/pageviews.avsc .
This command creates a Java source file for users and pageviews in the “ksql” directory, which then must be compiled into a jar.
javac -cp /usr/illumon/latest/java_lib/avro-1.9.2.jar:/usr/illumon/latest/java_lib/jackson-core-2.10.2.jar ksql/*.java
jar cf ksql.jar ksql/*.class
sudo cp ksql.jar /etc/sysconfig/illumon.d/java_lib/
After the jar has been copied to the /etc/sysconfig/illumon.d/java_lib
directory, you can then use it from a Groovy script in a worker. In this example, we use the POJO adapter with the Avro generated ksql.pageviews
and ksql.users
classes.
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true)
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
props.put("schema.registry.url", "http://localhost:8081/")
import io.deephaven.kafka.ingest.KafkaIngester
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import com.illumon.iris.db.util.logging.IrisLogCreator
import com.illumon.iris.db.v2.routing.DataRoutingServiceFactory
routingService = DataRoutingServiceFactory.getDefault()
// if the dis configuration specifies "private" storage, then you must provide a storage location, otherwise set this to null
storage = "/db/dataImportServers/KafkaImport1"
disName = "KafkaImport1"
dis = DataImportServer.getDataImportServer(null, disName, Configuration.getInstance(), db.getSchemaService(), routingService, storage)
dis.startInWorker()
pojopv = new io.deephaven.kafka.ingest.PojoConsumerRecordToTableWriterAdapter.Builder()
.offsetColumnName("Offset")
.kafkaPartitionColumnName("Partition")
.valueClass(Class.forName("ksql.pageviews"))
.caseInsensitiveSearch(true)
.buildFactory()
pojou = new io.deephaven.kafka.ingest.PojoConsumerRecordToTableWriterAdapter.Builder()
.kafkaPartitionColumnName("Partition")
.offsetColumnName("Offset")
.timestampColumnName("Timestamp")
.valueClass(Class.forName("ksql.users"))
.caseInsensitiveSearch(true)
.buildFactory()
kipv = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Pageview", "pageviews", pojopv)
kipv.start()
kiu = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Users", "users", pojou)
kiu.start()
Protocol Buffer Adapter
The io.deephaven.kafka.ingest.ProtobufConsumerRecordToTableWriterAdapter
converts a Protocol Buffer message to a Deephaven row (or multiple rows). There are two options for parsing messages: via descriptor set or via function. The descriptor set parsing is limited in its abilities since only simple/flat messages will be properly handled. If you have a jar with classes from the output of ./bin/protoc --java_out=. ${Proto}.proto
, then it may be added to the classpath for message-parsing by adding your jar to the /etc/sysconfig/illumon.d/java_lib
directory.
// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester
import io.deephaven.kafka.ingest.ProtobufConsumerRecordToTableWriterAdapter
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.illumon.iris.db.util.logging.IrisLogCreator
import com.illumon.iris.db.v2.routing.DataRoutingServiceFactory
// the props object is used to configure the Kafka consumer
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
// create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault()
// if the dis configuration specifies "private" storage, then you must provide a storage location, otherwise set this to null
storage = "/db/dataImportServers/KafkaImport1"
disName = "KafkaImport1"
dis = DataImportServer.getDataImportServer(null, disName, Configuration.getInstance(), db.getSchemaService(), routingService, storage)
dis.startInWorker()
// 'Person.desc' is generated by `protoc --descriptor_set_out=Person.desc ${proto}`
pbWriter = new ProtobufConsumerRecordToTableWriterAdapter.Builder("Person.desc")
.kafkaPartitionColumnName("Partition")
.kafkaKeyColumnName("Key")
.offsetColumnName("Offset")
.timestampColumnName("Timestamp")
.autoValueMapping(true)
// Note: column-names are generally capitalized, but convention for protobuf is that field-names are camelCase. in
// order for the column called "Email" to be populated be a field called "email", we would need to be case-insensitive
.caseInsensitiveSearch(true)
.buildFactory()
ki = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Person", "person-topic", pbWriter)
ki.start()
// The table is now available from this and other workers
Person = db.i("Kafka", "Person")
.where("Date=currentDateNy()")
If the protoc
-generated class(es) for the message is known, the following could instead be used for the io.deephaven.kafka.ingest.ProtobufConsumerRecordToTableWriterAdapter
builder. The following is based on a message-type of "ContactList", which also includes a repeating-field of "Person".
// These classes come from a jar in /etc/sysconfig/illumon.d/java_lib, and are generated by `protoc --java_out=. ${proto}`.
// Class-compilation of the resultant java is done via `javac`, and the jar is created with `jar cf` against those classes
import io.deephaven.kafka.ingest.proto.Identity
import io.deephaven.kafka.ingest.proto.ContactList
pbWriter = new ProtobufConsumerRecordToTableWriterAdapter.Builder(pBufBytes -> {
// Given the bytes from the kafka stream, we are able to construct a "Contacts" message. This allows us to cast
// to the known type within each of our column-to-value functions
return ContactList.Contacts.parseFrom(pBufBytes)
})
.kafkaPartitionColumnName("Partition")
.offsetColumnName("Offset")
.timestampColumnName("Timestamp")
// We expect "person" to be a repeating-field. For each element within "person", we will add a row. The
// `record.getParallelIdx()` within the column-to-value-functions is incremented for each element
.withParallelValueField("person")
// The value of "source" will be mapped to the "Source" column for each of the iterations
.addColumnToValueField("Source", "source")
// The key from the record will be mapped to the "Key" column for each of the iterations
.addColumnToValueFunction("Key", record -> {
record.getConsumerRecord().key()
})
// Each of the following is also executed for each element with "person". It is up to the builder to index into the
// appropriate field. This is done by indexing into the field with `record.getParallelIdx()`
.addColumnToValueFunction("Name", record -> {
contactList = ((ContactList.Contacts)record.getProtoBuf())
person = contactList.getPerson(record.getParallelIdx())
return person.getName()
})
.addColumnToValueFunction("Email", record -> {
((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getEmail()
})
.addColumnToValueFunction("Age", record -> {
// We are able to use any logic we want within the normalization of each row
age = ((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getAge()
return age <= 0 ? com.illumon.util.QueryConstants.NULL_INT : age
})
.addColumnToValueFunction("Relation", record -> {
// ENUM may be stored as a string-representation
((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getRelation().name()
})
.addColumnToValueFunction("RelationEnum", record -> {
// ENUM may be stored by numeric-value
((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getRelationValue()
})
.buildFactory()
ki = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Person", "contacts-topic", pbWriter)
ki.start()
Note
See also: The Javadoc contains a complete description of the Proto/Kafka Builder options.
JSON Adapter
The io.deephaven.kafka.ingest.JsonConsumerRecordToTableWriterAdapter
converts a JSON message to a Deephaven row (or multiple rows). The following example demonstrates consumption of a sample message, as identified in the comment of the query.
// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester
import io.deephaven.kafka.ingest.JsonConsumerRecordToTableWriterAdapter
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.illumon.iris.db.util.logging.IrisLogCreator
import com.illumon.iris.db.v2.routing.DataRoutingServiceFactory
// The props object is used to configure the Kafka consumer
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
// Create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault()
// if the dis configuration specifies "private" storage, then you must provide a storage location, otherwise set this to null
storage = "/db/dataImportServers/KafkaImport1"
disName = "KafkaImport1"
dis = DataImportServer.getDataImportServer(null, disName, Configuration.getInstance(), db.getSchemaService(), routingService, storage)
dis.startInWorker()
// Will parse the following JSON message, generating 2 rows because of the `.addFieldParallel(...)` and
// `.addNestedFieldParallel(...)` options:
// { "a": "a string", "b": 3.1416, "c": 3, "short": [ null, 4 ], "nested": [ {"inner": 13}, null ] }
jsonWriter = JsonConsumerRecordToTableWriterAdapter
.builder()
.kafkaKeyColumnName("Key")
.offsetColumnName("Offset")
.addColumnToValueFunction("ProcessTime", jsonRecord -> DBDateTime.now())
.addColumnToValueField("A", "a")
.addColumnToValueField("B", "b")
.addColumnToValueField("C", "c")
.addFieldParallel("Short", "short")
.addNestedFieldParallel(JsonConsumerRecordToTableWriterAdapter.nestedBuilder().addColumnToValueField("NestedInt", "inner"), "nested")
.allowNullValues(true)
// Use up to 4 threads to parse inbound messages in parallel, while maintaining proper ordering of messages on disk
.parallelParsers(4)
.buildFactory()
jsonIngester = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Json", "test-json", jsonWriter)
jsonIngester.start()
// The table is now available from this worker
Json = db.i("Kafka", "Json")
.where("Date=currentDateNy()")
ParseOnce Option
In some cases, a JSON message may include an embedded message of a different type. It is possible to access each member of the embedded structure individually in a call to .addColumnToValueFunction("Column", record -> { /* parse complex field and get sub-element */ } )
, but this would cause the embedded field to be parsed multiple times. In order to prevent multiple-parsing of complex fields, the ability to "pre-parse" a message and access the result multiple times is provided. A call to .setParseOnce(...)
allows the query builder to store an arbitrary (parsed or otherwise) Object, which may be accessed easily from the record in a .addColumnToValueFunction(...)
call.
The following pseudo-code example demonstrates single-parse/multi-access of a complex JSON field.
import com.illumon.util.type.TypeUtils
// Assuming a message created with the following, where "serializableObject" is an instance of "SomeObjectType":
// json = "{ \"serialized\": \"" + TypeUtils.encode64Serializable(serializableObject) + "\" }"
JsonConsumerRecordToTableWriterAdapter
.builder()
.setParseOnce(jsonRecord -> {
// Deserializes to an instance of type "SomeObjectType", and stores for later access
return TypeUtils.decode64Serializable(JsonRecordUtil.getString(jsonRecord, "serialized"))
})
.addColumnToValueFunction("SomeValue", jsonRecord -> {
// Instead of deserializing, simply "get" and cast. Methods/members may be access from the class
return ((SomeObjectType) jsonRecord.getParsedObject()).getSomeValue()
})
.addColumnToValueFunction("OtherValue", jsonRecord -> {
// Instead of deserializing, simply "get" and cast (again). Methods/members may be access from the class
return ((SomeObjectType) jsonRecord.getParsedObject()).getOtherValue()
})
.buildFactory()
Note
See also: The Javadoc contains a complete description of the JSON/Kafka Builder options.
Filter a Kafka stream
Deephaven supports filtering records from a Kafka stream during ingestion. This can help the performance of downstream queries in high throughput use cases, because unnecessary data is filtered before it is written to a Deephaven table.
Every Kafka adapter's Builder has a setFilter
method which takes a Predicate
. Only records that pass the
predicate will be ingested into Deephaven.
Filtering with a Generic Record Adapter
In this example, we retrieve the userid
field from the record value, and only persist records with a userid of Bob
:
pvrca = new io.deephaven.kafka.ingest.GenericRecordConsumerRecordToTableWriterAdapter.Builder()
.offsetColumnName("Offset")
.kafkaPartitionColumnName("Partition")
.addColumnToValueField("userid", "userid")
.addColumnToValueField("pageid", "pageid")
.addColumnToValueField("viewtime", "viewtime")
.setFilter({record -> "Bob".equals(((GenericRecord) record.value()).get("userid"))})
.buildFactory()
Filtering with a Protocol Buffer Adapter
In this example, we only persist records with the key Key
, or, when the key is empty, the Name Bob
.
pbWriter = new ProtobufConsumerRecordToTableWriterAdapter.Builder("Person.desc")
.kafkaPartitionColumnName("Partition")
.kafkaKeyColumnName("Key")
.offsetColumnName("Offset")
.timestampColumnName("Timestamp")
.autoValueMapping(true)
.setFilter({consumerRecord, protobufRecord ->
if(consumerRecord.key() == null) {
return "Bob".equals(protobufRecord.getValue("Name", String.class));
} else {
return "Key".equals(consumerRecord.key());
}
})
.buildFactory()