Kafka integration

The Deephaven Kafka integration operates similarly to an Import-driven lastBy Query. A separate Data Import Server (DIS) instance is run within the script of a query. The Deephaven Kafka ingester subscribes to one or more Kafka topics and replicates them to streaming Deephaven tables. The query additionally serves those tables to other workers through a TCP connection as described by routing configuration.

The Data Import Server stores the replicated Kafka offsets inside of the Deephaven checkpoint record, which is updated atomically with the size of the replicated Deephaven table. This allows Deephaven to resume the Kafka stream and provide exactly once replication of the Kafka stream even in the face of restarts. Note: a Kafka broker itself may provide different guarantees to its producers depending on the selected configuration (e.g., at-least once delivery or best-effort delivery). Deephaven does not change those guarantees but rather preserves the same level of guarantee.

The simplest example of configuring a Deephaven Kafka ingester is a topic that does not have a schema defined, but rather deserializes objects that directly map to Deephaven columns. This is not the typical use case, but it is simple to get up and running quickly. The Apache Kafka quick start guide is one such example. If you are able to run the kafka-console-consumer.sh tool, you can replicate the “test” topic to Deephaven.

The first step to importing Kafka data is to configure your storage location and data routing service. Each instance of a Data Import Server must have exclusive control over the directories that it is writing to. If the data routing is configured such that two Data Import Servers will have a disjoint set of partitions, they can share the storage. However, for simplicity, Deephaven recommends that each DIS has a unique directory using a path such as /db/dataImportServers]/[DIS_Name].

For example, to create a directory for KafkaImport1:

sudo mkdir -p /db/dataImportServers/KafkaImport1
sudo chown dbmerge.dbmergegrp /db/dataImportServers/KafkaImport1

The storage directory must exist when a routing YAML file that references that storage location is first used - even before the ingester that will be writing to it is first started. When the script is run as a persistent query against a merge server (preferred for production use), the dbmerge account will need read and write privileges. If the script is to be run in a regular query or console, the dbquery user on the server will need to have read and write access to this path, or, alternatively, users with sufficient rights can run a console inside a merge worker to allow test and development of the query using the same dbmerge permissions as a production ingester.

Configure storage and data routing

The system needs to be configured so that data for the Kafka tables is sent to and read from the appropriate processes and locations.

Configure a storage location

Add the location where the Kafka DIS process will store its intraday data. In this section, "KafkaImport1" is an arbitrary identifier, which will be referenced later in the configuration file. The dbRoot "/db/dataImportServers/KafkaImport1" is also arbitrary and not directly related to the identifier.

storage:
  - name: default
    dbRoot: /db
  - name: KafkaImport1
    dbRoot: /db/dataImportServers/KafkaImport1

Configure the new in-worker Data Import Server

Create a new entry in the dataImportServers section. Create filters so that only the desired table or tables are accepted. You must assign appropriate values for host, storage, definitionsStorage, and tableDataService/port. In this example, we have configured the KafkaImport1 DIS to accept system tables in the “Kafka” namespace.

dataImportServers
...
KafkaImport1:
   host: *dh-import
   tailerPort: 22224
   # Handle Kafka tables only
   filters: {whereTableKey: "NamespaceSet = `System` && Namespace == `Kafka`"}
   webServerParameters:
      enabled: false
   storage: KafkaImport1
   definitionsStorage: default
   tableDataPort: 22223

The host property indicates on which machine this ingester will run. Usually this will be a merge server, with fast local storage to store incoming Kafka data. Another host name, IP address, or yaml identifier can be used here to have the ingester run on a different machine than the system's default import server.

The tailerPort is used to send commands to the ingester; most commonly this would be a command to delete intraday data after it has been merged and validated.

The tableDataPort is used by workers that need to subscribe to tables provided by the ingester.

Adjust existing data import server(s)

Assuming you want the data handled by the in-worker DIS to be handled exclusively by that new DIS instance, adjust the filters on any other DIS instances to exclude the table(s).

Warning

The original YAML may contain checks for online: true or namespaceSet: "System". These should be removed and replaced by the whereTableKey filter. The online and namespaceSet filters are incompatible with whereTableKey.

dataImportServers
...
db_dis:
   ...
   # don't process Kafka
   filters: {whereTableKey: "NamespaceSet = `System` && Namespace != `Kafka`"}

Change TDCP data routing configuration(s)

The tableDataServices section defines how data is routed. One or or more entries in this section will need to be adjusted so that query and merge workers will source data appropriately. In this example, it is the table data cache proxy service that serves live data from the data import services.

This example makes the table data cache proxy serve the live data, and points the "Kafka" namespace to the "KafkaImport1" DIS while excluding it from the default DIS.

db_tdcp:
   host: localhost
   port: *default-tableDataCacheProxyPort
   sources:
   - name: db_dis
     # exclude namespace Kafka
     filters: {whereTableKey: "NamespaceSet = `System` && Namespace != `Kafka`"}
   # all user data
   - name: db_rta
     filters: {namespaceSet: User}
   # include only namespace Kafka
   - name: KafkaImport1
     filters: {whereTableKey: "NamespaceSet = `System` && Namespace == `Kafka`"}

Configure Local Storage Routing

Local storage (reading directly from the disk where the process is running) is the default method for accessing historical tables' data files. It is also the default method when reading data to be merged from intraday to historical. The data routing table normally contains an entry called "local" that is used for these purposes. Since the in-worker DIS process used to consume Kafka topics has its own storage path, it needs its own local definition in the routing file. This can be combined with the original "local" or defined independently.

(local includes original local and KafkaImport1):

tableDataServices:
   ...
   kafka:
     storage: KafkaImport1
   local1:
     storage: default
   local:
     sources:
     - name: local1
     - name: kafka

(local is original local, and KafkaImport1 is referred to by kafka):

tableDataServices:
   ...
   kafka:
     storage: KafkaImport1
   local:
     storage: default

Any defined entries here will be available in the TableDataService selection dialogs when creating merge jobs:

img

The above example corresponds to the routing entries which include original local (local1) and kafka under local. If desired, tags can be used to restrict which TableDataService entries are shown in the UI.

Create a Schema

Each table you import must have a schema defined. You can create a Schema using the Schema Editor from the Deephaven Java client. The schema below is suitable for use with the Kafka test topic from the quick start guide. However, you will need to create a schema with suitable data types for the records in your topic.

<Table name="Test" namespace="Kafka" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
  <Partitions keyFormula="${autobalance_by_first_grouping_column}" />

  <Column name="Date" dataType="String" columnType="Partitioning" />
  <Column name="KafkaPartition" dataType="Int" />
  <Column name="Offset" dataType="Long" />
  <Column name="Timestamp" dataType="DateTime" />
  <Column name="Key" dataType="String" columnType="Grouping" />
  <Column name="Value" dataType="String" />
</Table>

Discovering a Deephaven Schema from an Avro Schema

Often Kafka topics use Apache Avro serialization. Avro schemas are defined using JSON, and Deephaven can generate a Deephaven XML schema from an Avro schema. The Avro schema is discovered using a SchemaDiscovery builder from a Deephaven Groovy console.

Important

When adding a table using the addSchema() method, the namespace for the new table must already exist. To create an empty namespace, use schemaService.createNamespace("<new_namespace_name>").

The simplest case is creating a schema with only the columnPartition() option, which will take the namespace and table name from the namespace and name in the Avro schema:

import com.illumon.iris.db.schema.SchemaServiceFactory
import io.deephaven.kafka.ingest.SchemaDiscovery

// Note that .columnPartition(...) is required so that an in-worker DIS can write to the table
ad = SchemaDiscovery.avroFactory(new File("pageviews.avsc"))
      .columnPartition("Date")

schema = ad.generateDeephavenSchema()
// Note that the namespace for the new schema must already exist (see above)
SchemaServiceFactory.getDefault().addSchema(schema)

Caution

Note: You must be a member of the group iris-schemamanagers to manipulate System schemas.

The SchemaDiscovery object provides methods for setting the table name, namespace, behavior for unmappable types, and column names for the internal partition, offset, and timestamp ConsumerRecord fields. For example, to set the tablename and namespace:

Groovy

import io.deephaven.kafka.ingest.SchemaDiscovery

// Note that .columnPartition(...) is required so that an in-worker DIS can write to the table
ad = SchemaDiscovery.avroFactory(new File("pageviews.avsc"))
      .columnPartition("Date")
      .namespace("Kafka")
      .tableName("PageViews")

schema = ad.generateDeephavenSchema()


// Nested Avro schemas may be used for SchemaDiscovery by including prerequisite Avro schemas in the parser
parser = new org.apache.avro.Schema.Parser()
positionSchema = parser.parse(new File("position.avsc"))
playerSchema = parser.parse(new File("player.avsc"))

nad = SchemaDiscovery.avroFactory(playerSchema)
    .columnPartition("Date")
    .unmappableBehavior(SchemaDiscovery.UnmappableBehavior.Ignore)

nestedSchema = nad.generateDeephavenSchema()

Note

See also: The Javadoc contains a complete description of the AvroDiscovery options.

Discovering a Deephaven Schema from a Protocol Buffer Descriptor

Similar to Avro, a Deephaven schema may be generated from a Google Protocol Buffer descriptor set. This schema detection does not currently handle inner protobufs, "repeated" fields, or map fields.

import io.deephaven.kafka.ingest.SchemaDiscovery

// Note that .columnPartition(...) is required so that an in-worker DIS can write to the table
pbd = SchemaDiscovery.protobufFactory("Person.desc")
      .columnPartition("Date")
      .namespace("Kafka")
      .tableName("Person")

schema = pbd.generateDeephavenSchema()

Import Script components and ingester functionality

The import script can be tested/developed in a Code Studio or Console using a merge worker. When run for production use, it will be run as a "Live Query - Merge Server" persistent query. In either case, it needs to run in a merge worker so that it will have rights to write data to the import server's filesystem.

The import script will handle:

  • configuring and starting the in-worker DIS that will use the storage and properties defined in the routing YAML
  • setting properties for the Kafka consumer and starting the consumer
  • creating and configuring a table writer adapter using a builder class

The Kafka consumer used by the import script is an org.apache.kafka.clients.consumer.KafkaConsumer. Some of its properties are shown in the examples here, but there are many more which relate to such cases as two-way TLS, authentication with a schema service, and others. Please refer to the Apache documentation for this consumer for details and examples of other properties that can be configured.

Deephaven provides several table writer adapters. Table writer adapters provide the functionality of receiving a message from the Kafka topic and extracting field values from it which will then be written to Deephaven table columns. Depending on how the data is organized in the source topic, a table writer adapter may be able to directly and automatically map the values into Deephaven columns, or it may be necessary to add transformation functions to convert or interpret values between Kafka and Deephaven.

props.put("enable.auto.commit", "false") is required when configuring the Kafka consumer for use by a Deephaven ingester. This setting allows the in-worker DIS, which is actually writing rows to disk, to also manage commits back to the Kafka broker. The first time a Deephaven ingester connects to a topic, it will request to start receiving records from offset 0 - i.e., the oldest records the broker has for this topic. The broker will associate the offset for the ingester based on the supplied consumer group (props.put("group.id", "dhdis")). Any not-already-used name can be used for the Deephaven consumer group, but it may be necessary to update Kafka permissions on the broker side to allow the Deephaven consumer to manage its own consumer groups. As the in-worker DIS receives and processes Kafka records, it immediately delivers the table row data to Deephaven clients that have subscribed for updates, and it also, in parallel, flushes and checkpoints row data to disk. When a checkpoint is complete, the ingester sends a checkpoint notification to the Kafka broker to update the latest offset that has been delivered. In this way, when an ingester is restarted, whether intentionally or unintentionally, it can pick up exactly where it left off in receiving records from the topic.

The method, io.deephaven.kafka.ingest.ResumeImportFrom#resumeAtOffset, can be used to have a new ingester start from a specified offset rather than 0, and can also be used to reset an ingester back to 0, if needed.

Individual columns which need special handling are added to the table writer adapter builder arguments. These are structured as Groovy closures, which get a column name and a Kafka record, and can then do conditional processing to return a value to be written into the Deephaven column.

// Add a field handler which will return null if a value doesn't
// exist in the current record, rather than throwing an exception.
.addColumnToValueFunction("maybe_null", record -> {
    return record.hasField("maybe_null") ?
        (String)(record.get("maybe_null")) : null;
})
// Add a field handler which will convert a long
// epoch offset value to a DBDateTime with a best
// guess as to what unit are used (milliseconds,
// microseconds, or nanoseconds).
.addColumnToValueFunction("date_time", record -> {
    return record.hasField("date_time") ?
        com.illumon.iris.db.tables.utils.DBTimeUtils.autoEpochToTime
          (Long.parseLong(record.get("date_time"))) : null;
})

Create an Import Script

Create a new persistent query for your Kafka ingestion DIS. This example uses the Kafka string deserializer and a SimpleConsumerRecordToTableWriterAdapter.

// Setup the kafka topic to match the topic that is being published.
// e.g. kafkaTopicName="quickstart-events"
//      kafkaTopicName="test"
kafkaTopicName="quickstart-events"

// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester
import io.deephaven.kafka.ingest.SimpleConsumerRecordToTableWriterAdapter
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory

// the props object is used to configure the Kafka consumer
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")

// create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault()

disConfig = routingService.getDataImportServiceConfig("KafkaImport1")
if (disConfig == null) {
    throw new IllegalArgumentException("Could not find disConfig!")
}
dis = DataImportServer.getDataImportServer(ProcessEnvironment.getDefaultLog(), disConfig, Configuration.getInstance(), db.getSchemaService())
dis.startInWorker()

ki = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Test", kafkaTopicName, SimpleConsumerRecordToTableWriterAdapter.makeFactory(null, "Offset", "Timestamp", "Key", "Value"))
ki.start()

// The table is now available from this and other workers
kt = db.i("Kafka", "Test").where("Date=currentDateNy()")

Note

See also: For more information on usage of the KafkaIngester and SimpleConsumerRecordToTableWriterAdapter classes, see the Deephaven Javadoc.

Generic Record Adapter

Rather than a simple one-to-one mapping between Kafka keys and values and a Deephaven column typically each Kafka key or value will contain multiple fields that are mapped to individual Deephaven columns. One popular serialization format is Avro. When using Avro, it is possible to deserialize without code generation using GenericRecords or with code generation using SpecificRecords. SpecificRecords are a special case for using the POJO adapter described in the next section.

To deserialize Avro records, you will need to add the Confluent deserializer to the class path of your Deephaven worker (RemoteQueryProcessor) by placing them in /etc/sysconfig/illumon.d/java_lib. These packages can be obtained from https://packages.confluent.io/maven/. The avro-serializer package is available at:

Note: All three jars are needed.

If your organization also uses the Confluent schema registry, you should additionally add the schema registry package from:

The schema registry client also has dependencies on:

so these will also need to be placed in /etc/sysconfig/illumon.d/java_lib if the schema registry client is being used.

In this example, we will use the pageview and users streams from the Confluent Quick Start using Community Components Guide to produce Avro data.

You can download the schema generated by the Confluent datagen plugin by identifying the schema ID for the latest versions:

curl --silent -X GET http://localhost:8081/subjects/users-value/versions/latest | jq -r '.schema' > /tmp/users.avsc
curl --silent -X GET http://localhost:8081/subjects/pageviews-value/versions/latest | jq -r '.schema' > /tmp/pageviews.avsc

Using the Avro discovery tool, we can generate the equivalent Deephaven schemas:

import com.illumon.iris.db.schema.SchemaServiceFactory
import io.deephaven.kafka.ingest.SchemaDiscovery

ad = SchemaDiscovery.avroFactory(new File("/tmp/pageviews.avsc"))
          .columnPartition("Date")
          .kafkaPartitionColumn("Partition")
          .kafkaOffsetColumn("Offset")
          .namespace("Kafka")
          .tableName("Pageview")
SchemaServiceFactory.getDefault().addSchema(ad.generateDeephavenSchema())

ad = SchemaDiscovery.avroFactory(new File("/tmp/users.avsc"))
          .columnPartition("Date")
          .kafkaTimestampColumn("Timestamp")
          .kafkaPartitionColumn("Partition")
          .kafkaOffsetColumn("Offset")
          .namespace("Kafka")
          .tableName("Users")
SchemaServiceFactory.getDefault().addSchema(ad.generateDeephavenSchema())

We can verify that the schemas have been successfully deployed by using the getMeta() operation on the new tables.

pvm = db.getTable("Kafka", "Pageview").getMeta()
um = db.getTable("Kafka", "Users").getMeta()

The following script will import the topics into the Kafka.Pageviews and Kafka.Users tables.

// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester

import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory

// Confluent imports
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig


props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false)
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
props.put("schema.registry.url", "http://localhost:8081/")


routingService = DataRoutingServiceFactory.getDefault()

disConfig = routingService.getDataImportServiceConfig("KafkaImport1")
dis = DataImportServer.getDataImportServer(ProcessEnvironment.getDefaultLog(), disConfig, Configuration.getInstance(), db.getSchemaService())
dis.startInWorker()

pvrca = new io.deephaven.kafka.ingest.GenericRecordConsumerRecordToTableWriterAdapter.Builder()
      .offsetColumnName("Offset")
      .kafkaPartitionColumnName("Partition")
      .addColumnToValueField("userid", "userid")
      .addColumnToValueField("pageid", "pageid")
      .addColumnToValueField("viewtime", "viewtime")
      .buildFactory()

kipv = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Pageview", "pageviews", pvrca)
kipv.start()

urca = new io.deephaven.kafka.ingest.GenericRecordConsumerRecordToTableWriterAdapter.Builder()
      .offsetColumnName("Partition")
      .offsetColumnName("Offset")
      .timestampColumnName("Timestamp")
      .autoValueMapping(true)
      .buildFactory()
kiu = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Users", "users", urca)
kiu.start()

// the tables can be viewed from either the DIS worker or other workers
kp = db.i("Kafka", "Pageview").where("Date=currentDateNy()")
ku = db.i("Kafka", "Users").where("Date=currentDateNy()")

POJO Adapter

The io.deephaven.kafka.ingest.PojoConsumerRecordToTableWriterAdapter converts a Java object to a Deephaven row using reflection and code generation. You must add your objects to the classpath of the Deephaven worker (RemoteQueryProcessor), by adding your jar to the /etc/sysconfig/illumon.d/java_lib directory.

If you are using Avro Specific records, you must compile your Avro schema into Java class files, as described in the Avro documentation. Using the same Confluent Quick Start example, we can compile the Avro schema into Java classes using the downloaded Avro tools jar:

java -jar avro-tools-1.9.2.jar  compile schema /tmp/users.avsc /tmp/pageviews.avsc .

This command creates a Java source file for users and pageviews in the “ksql” directory, which then must be compiled into a jar.

javac -cp /usr/illumon/latest/java_lib/avro-1.9.2.jar:/usr/illumon/latest/java_lib/jackson-core-2.10.2.jar ksql/*.java
jar cf ksql.jar ksql/*.class
sudo cp ksql.jar /etc/sysconfig/illumon.d/java_lib/

After the jar has been copied to the /etc/sysconfig/illumon.d/java_lib directory, you can then use it from a Groovy script in a worker. In this example, we use the POJO adapter with the Avro generated ksql.pageviews and ksql.users classes.

import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig

props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true)
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
props.put("schema.registry.url", "http://localhost:8081/")

import io.deephaven.kafka.ingest.KafkaIngester

import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory

routingService = DataRoutingServiceFactory.getDefault()

disConfig = routingService.getDataImportServiceConfig("KafkaImport1")
dis = DataImportServer.getDataImportServer(ProcessEnvironment.getDefaultLog(), disConfig, Configuration.getInstance())
dis.startInWorker()

pojopv = new io.deephaven.kafka.ingest.PojoConsumerRecordToTableWriterAdapter.Builder()
      .offsetColumnName("Offset")
      .kafkaPartitionColumnName("Partition")
      .valueClass(Class.forName("ksql.pageviews"))
      .caseInsensitiveSearch(true)
      .buildFactory()

pojou = new io.deephaven.kafka.ingest.PojoConsumerRecordToTableWriterAdapter.Builder()
      .kafkaPartitionColumnName("Partition")
      .offsetColumnName("Offset")
      .timestampColumnName("Timestamp")
      .valueClass(Class.forName("ksql.users"))
      .caseInsensitiveSearch(true)
      .buildFactory()


kipv = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Pageview", "pageviews", pojopv)
kipv.start()
kiu = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Users", "users", pojou)
kiu.start()

Protocol Buffer Adapter

The io.deephaven.kafka.ingest.ProtobufConsumerRecordToTableWriterAdapter converts a Protocol Buffer message to a Deephaven row (or multiple rows). There are two options for parsing messages: via descriptor set or via function. The descriptor set parsing is limited in its abilities since only simple/flat messages will be properly handled. If you have a jar with classes from the output of ./bin/protoc --java_out=. ${Proto}.proto, then it may be added to the classpath for message-parsing by adding your jar to the /etc/sysconfig/illumon.d/java_lib directory.

// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester
import io.deephaven.kafka.ingest.ProtobufConsumerRecordToTableWriterAdapter
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.illumon.iris.db.util.logging.IrisLogCreator
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory

// the props object is used to configure the Kafka consumer
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")

// create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault()

disConfig = routingService.getDataImportServiceConfig("KafkaImport1")
if (disConfig == null) {
    throw new IllegalArgumentException("Could not find disConfig!")
}
dis = DataImportServer.getDataImportServer(IrisLogCreator.getSingleton(), disConfig, Configuration.getInstance(), db.getSchemaService())
dis.startInWorker()

// 'Person.desc' is generated by `protoc --descriptor_set_out=Person.desc ${proto}`
pbWriter = new ProtobufConsumerRecordToTableWriterAdapter.Builder("Person.desc")
    .kafkaPartitionColumnName("Partition")
    .kafkaKeyColumnName("Key")
    .offsetColumnName("Offset")
    .timestampColumnName("Timestamp")
    .autoValueMapping(true)
    // Note: column-names are generally capitalized, but convention for protobuf is that field-names are camelCase. in
    // order for the column called "Email" to be populated be a field called "email", we would need to be case-insensitive
    .caseInsensitiveSearch(true)
    .buildFactory()

ki = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Person", "person-topic", pbWriter)
ki.start()

// The table is now available from this and other workers
Person = db.i("Kafka", "Person")
    .where("Date=currentDateNy()")

If the protoc-generated class(es) for the message is known, the following could instead be used for the io.deephaven.kafka.ingest.ProtobufConsumerRecordToTableWriterAdapter builder. The following is based on a message-type of "ContactList", which also includes a repeating-field of "Person".

// These classes come from a jar in /etc/sysconfig/illumon.d/java_lib, and are generated by `protoc --java_out=. ${proto}`.
// Class-compilation of the resultant java is done via `javac`, and the jar is created with `jar cf` against those classes
import io.deephaven.kafka.ingest.proto.Identity
import io.deephaven.kafka.ingest.proto.ContactList

pbWriter = new ProtobufConsumerRecordToTableWriterAdapter.Builder(pBufBytes -> {
        // Given the bytes from the kafka stream, we are able to construct a "Contacts" message. This allows us to cast
        // to the known type within each of our column-to-value functions
        return ContactList.Contacts.parseFrom(pBufBytes)
    })
    .kafkaPartitionColumnName("Partition")
    .offsetColumnName("Offset")
    .timestampColumnName("Timestamp")
    // We expect "person" to be a repeating-field. For each element within "person", we will add a row. The
    // `record.getParallelIdx()` within the column-to-value-functions is incremented for each element
    .withParallelValueField("person")
    // The value of "source" will be mapped to the "Source" column for each of the iterations
    .addColumnToValueField("Source", "source")
    // The key from the record will be mapped to the "Key" column for each of the iterations
    .addColumnToValueFunction("Key", record -> {
        record.getConsumerRecord().key()
    })

    // Each of the following is also executed for each element with "person". It is up to the builder to index into the
    // appropriate field. This is done by indexing into the field with `record.getParallelIdx()`
    .addColumnToValueFunction("Name", record -> {
        contactList = ((ContactList.Contacts)record.getProtoBuf())
        person = contactList.getPerson(record.getParallelIdx())
        return person.getName()
    })
    .addColumnToValueFunction("Email", record -> {
        ((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getEmail()
    })
    .addColumnToValueFunction("Age", record -> {
        // We are able to use any logic we want within the normalization of each row
        age = ((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getAge()
        return age <= 0 ? com.illumon.util.QueryConstants.NULL_INT : age
    })
    .addColumnToValueFunction("Relation", record -> {
        // ENUM may be stored as a string-representation
        ((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getRelation().name()
    })
    .addColumnToValueFunction("RelationEnum", record -> {
        // ENUM may be stored by numeric-value
        ((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getRelationValue()
    })
    .buildFactory()

ki = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Person", "contacts-topic", pbWriter)
ki.start()

Note

See also: The Javadoc contains a complete description of the Proto/Kafka Builder options.

JSON Adapter

The io.deephaven.kafka.ingest.JsonConsumerRecordToTableWriterAdapter converts a JSON message to a Deephaven row. The following example demonstrates consumption of a sample message, as identified in the comment of the query.

// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester
import io.deephaven.kafka.ingest.JsonConsumerRecordToTableWriterAdapter
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.illumon.iris.db.util.logging.IrisLogCreator
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory

// The props object is used to configure the Kafka consumer
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")

// Create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault()

disConfig = routingService.getDataImportServiceConfig("KafkaImport1")
if (disConfig == null) {
    throw new IllegalArgumentException("Could not find disConfig!")
}
dis = DataImportServer.getDataImportServer(IrisLogCreator.getSingleton(), disConfig, Configuration.getInstance(), db.getSchemaService())
dis.startInWorker()

// Will parse the following JSON message: { "a": "a string", "b": 3.1416, "c": 3, "nested": [ {"inner": 13} ] }
jsonWriter = new JsonConsumerRecordToTableWriterAdapter.Builder()
    .offsetColumnName("Offset")
    .timestampColumnName("Timestamp")
    .addColumnToValueField("A", "a")
    .addColumnToValueField("B", "b")
    .addColumnToValueField("C", "c")
    .addColumnToValueFunction("NestedInt", { jsonRecord -> jsonRecord.getRecord().get("nested").get(0).get("inner").asInt() } )
    .buildFactory()

jsonIngester = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Json", "test-json", jsonWriter)
jsonIngester.start()

// The table is now available from this worker
Json = db.i("Kafka", "Json")
    .where("Date=currentDateNy()")

Note

See also: The Javadoc contains a complete description of the JSON/Kafka Builder options.