Connect Kafka
The Deephaven Kafka integration operates similarly to an Import-driven lastBy Query. A separate Data Import Server (DIS) instance is run within the script of a query. The Deephaven Kafka ingester subscribes to one or more Kafka topics and replicates them to streaming Deephaven tables. The query additionally serves those tables to other workers through a TCP connection as described by routing configuration.
The Data Import Server stores the replicated Kafka offsets inside of the Deephaven checkpoint record, which is updated atomically with the size of the replicated Deephaven table. This allows Deephaven to resume the Kafka stream and provide exactly once replication of the Kafka stream even in the face of restarts. Note: a Kafka broker itself may provide different guarantees to its producers depending on the selected configuration (e.g., at-least once delivery or best-effort delivery). Deephaven does not change those guarantees but rather preserves the same level of guarantee.
The simplest example of configuring a Deephaven Kafka ingester is a topic that does not have a schema defined, but rather deserializes objects that directly map to Deephaven columns. This is not the typical use case, but it is simple to get up and running quickly. The Apache Kafka quick start guide (https://kafka.apache.org/quickstart) is one such example. If you are able to run the kafka-console-consumer.sh
tool, you can replicate the “test” topic to Deephaven.
The first step to importing Kafka data is to configure your storage location and data routing service. Each instance of a Data Import Server must have exclusive control over the directories that it is writing to. If the data routing is configured such that two Data Import Servers will have a disjoint set of partitions, they can share the storage. However, for simplicity, Deephaven recommends that each DIS has a unique directory using a path such as /db/dataImportServers/[DIS_Name]
.
For example, to create a directory for KafkaImport1
:
sudo mkdir -p /db/dataImportServers/KafkaImport1
sudo chown dbquery.dbmergegrp /db/dataImportServers/KafkaImport1
The storage directory must exist before a Kafka ingestion DIS is started. When the script is running in a regular query or console, the dbquery user on the server will need to have read and write access to this path. When the script is run as a persistent query against a merge server (preferred for production use), the dbmerge account will need read and write privileges.
Configure storage and data routing
The system needs to be configured so that data for the Kafka tables is sent to and read from the appropriate processes and locations. This is done through the routing configuration accessed via dhconfig routing
and modifying property files.
Tip
After you make changes to routing, stop/start all services in monit to pick up the changes made to routing.
Configure an anchor:
anchors:
---
- &dh-kafka <your-dns-name>
Configure a storage location:
Add the location where the Kafka DIS process will store its intraday data. In this section, "KafkaImport1" is an arbitrary identifier, which will be referenced later in the configuration file. The dbRoot
"/db/dataImportServers/KafkaImport1" is also arbitrary and not directly related to the identifier.
storage:
- name: default
dbRoot: /db
- name: KafkaImport1
dbRoot: /db/dataImportServers/KafkaImport1
Configure the new in-worker Data Import Server:
Create a new entry in the dataImportServers
section. Create filters so that only the desired table or tables are accepted. You must assign appropriate values for host
, storage
, definitionsStorage
, and tableDataService/port
. In this example, we have configured the KafkaImport1
DIS to accept system tables in the “Kafka” namespace.
dataImportServers
...
KafkaImport1:
host: *dh-kafka
tailerPort: -1
# Handle tables from the namespace Kafka only
filters: {whereTableKey: "NamespaceSet = `System` && Namespace == `Kafka`"}
webServerParameters:
enabled: false
storage: KafkaImport1
definitionsStorage: default
tableDataPort: 22223
Adjust existing data import server(s):
Assuming you want the data handled by the in-worker DIS to be handled exclusively by that new DIS instance, adjust the filters on any other DIS instances to exclude the table(s).
dataImportServers
...
db_dis:
...
# don't process Kafka
filters: {whereTableKey: "NamespaceSet = `System` && Namespace != `Kafka`"}
Change TDCP data routing configuration(s):
The tableDataServices
section defines how data is routed. One or more entries in this section will need to be adjusted so that query and merge workers will source data appropriately. In this example, it is the table data cache proxy service that serves live data from the data import services.
This example makes the table data cache proxy serve the live data, and points the "Kafka" namespace to the "KafkaImport1" DIS while excluding it from the default DIS.
db_tdcp:
host: *localhost
port: *default-tableDataCacheProxyPort
sources:
- name: db_dis
# exclude namespace Kafka
filters: {whereTableKey: "NamespaceSet = `System` && Namespace != `Kafka`"}
# all user data
- name: db_rta
filters: {namespaceSet: User}
# include only namespace Kafka
- name: KafkaImport1
filters: {whereTableKey: "NamespaceSet = `System` && Namespace == `Kafka`"}
Configure Local Storage Routing
Local storage (reading directly from the disk where the process is running) is the default method for accessing historical tables' data files. It is also the default method when reading data to be merged from intraday to historical. The data routing table normally contains an entry called "local" that is used for these purposes. Since the in-worker DIS process used to consume Kafka topics has its own storage path, it needs its own local definition in the routing file. This can be combined with the original "local" or defined independently.
(local
includes original local
and KafkaImport1
):
tableDataServices:
...
kafka:
storage: KafkaImport1
local1:
storage: default
local:
sources:
- name: local1
- name: kafka
(local
is original local
, and KafkaImport1
is referred to by kafka
):
tableDataServices:
...
kafka:
storage: KafkaImport1
local:
storage: default
Any defined entries here will be available in the TableDataService
selection dialogs when creating merge jobs.
For example, you can view the list of TableDataServices by creating a PQ with type Data Merge
and looking in the last tab for Merge Settings
. The dropdown options for the Table Data Service Configuration field will contain the new entries after services have been restarted.
The above example corresponds to the routing entries which include original local (local1
) and kafka
under local
. If desired, tags can be used to restrict which TableDataService
entries are shown in the UI.
Note
Create a Schema
Each table you import must have a schema defined. You can create a Schema using the Schema Editor from the Deephaven Java client. The schema below is suitable for use with the Kafka test topic from the quick start guide. However, you will need to create a schema with suitable data types for the records in your topic.
<Table name="Test" namespace="Kafka" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
<Partitions keyFormula="${autobalance_by_first_grouping_column}" />
<Column name="Date" dataType="String" columnType="Partitioning" />
<Column name="KafkaPartition" dataType="Int" />
<Column name="Offset" dataType="Long" />
<Column name="Timestamp" dataType="DateTime" />
<Column name="Key" dataType="String" columnType="Grouping" />
<Column name="Value" dataType="String" />
</Table>
Discovering a Deephaven Schema from an Avro Schema
Often Kafka topics use Apache Avro serialization. Avro schemas are defined using JSON, and Deephaven can generate a Deephaven XML schema from an Avro schema. The Avro schema is discovered using a SchemaDiscovery builder from a Deephaven Groovy console. The simplest case is simply creating a schema without any options:
import com.illumon.iris.db.schema.SchemaServiceFactory
import io.deephaven.kafka.ingest.SchemaDiscovery
// Note that `.columnPartition(...)` is required so that an in-worker DIS can write to the table
ad = SchemaDiscovery.avroFactory(new File("pageviews.avc"))
.columnPartition("Date")
schema = ad.generateDeephavenSchema()
SchemaServiceFactory.getDefault().addSchema(schema)
Caution
Note:
You must be a member of the group iris-schemamanagers
to manipulate System schemas.
The SchemaDiscovery object provides methods for setting the table name, namespace, behavior for unmappable types, and column names for the internal partition, offset, and timestamp ConsumerRecord fields. For example, to set the tablename and namespace:
import io.deephaven.kafka.ingest.SchemaDiscovery
ad = SchemaDiscovery.avroFactory(new File("pageviews.avc"))
.columnPartition("Date")
.namespace("Kafka")
.tableName("PageViews")
schema = ad.generateDeephavenSchema()
// Nested Avro schemas may be used for SchemaDiscovery by including prerequisite Avro schemas in the parser
parser = new org.apache.avro.Schema.Parser()
positionSchema = parser.parse(new File("position.avsc"))
playerSchema = parser.parse(new File("player.avsc"))
nad = SchemaDiscovery.avroFactory(playerSchema)
.columnPartition("Date")
.unmappableBehavior(SchemaDiscovery.UnmappableBehavior.Ignore)
nestedSchema = nad.generateDeephavenSchema()
Note
See also: The Javadoc contains a complete description of the SchemaDiscovery options.
Discovering a Deephaven Schema from a Protocol Buffer Descriptor
Similar to Avro, a Deephaven schema may be generated from a Google Protocol Buffer descriptor set. This schema detection does not currently handle inner protobufs, "repeated" fields, or map fields.
import io.deephaven.kafka.ingest.SchemaDiscovery
// Note that `.columnPartition(...)` is required so that an in-worker DIS can write to the table
pbd = SchemaDiscovery.protobufFactory("Person.desc")
.columnPartition("Date")
.namespace("Kafka")
.tableName("Person")
schema = pbd.generateDeephavenSchema()
Create an Import Script
Create a new persistent query for your Kafka ingestion DIS. This example uses the Kafka string deserializer and a SimpleConsumerRecordToTableWriterAdapter
.
// Setup the kafka topic to match the topic that is being published.
// e.g. kafkaTopicName="quickstart-events"
// kafkaTopicName="test"
kafkaTopicName="quickstart-events"
// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester
import io.deephaven.kafka.ingest.SimpleConsumerRecordToTableWriterAdapter
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory
// the props object is used to configure the Kafka consumer
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
// create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault()
disConfig = routingService.getDataImportServiceConfig("KafkaImport1")
if (disConfig == null) {
throw new IllegalArgumentException("Could not find disConfig!")
}
dis = DataImportServer.getDataImportServer(ProcessEnvironment.getDefaultLog(), disConfig, Configuration.getInstance(), db.getSchemaService())
dis.startInWorker()
ki = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Test", kafkaTopicName, SimpleConsumerRecordToTableWriterAdapter.makeFactory(null, "Offset", "Timestamp", "Key", "Value"))
ki.start()
// The table is now available from this and other workers
kt = db.i("Kafka", "Test").where("Date=currentDateNy()")
Note
See also: For more information on usage of the KafkaIngester and SimpleConsumerRecordToTableWriterAdapter classes, see the Deephaven Javadoc.
Generic Record Adapter
Rather than a simple one-to-one mapping between Kafka keys and values and a Deephaven column typically each Kafka key or value will contain multiple fields that are mapped to individual Deephaven columns. One popular serialization format is Avro. When using Avro, it is possible to deserialize without code generation using GenericRecords or with code generation using SpecificRecords. SpecificRecords are a special case for using the POJO adapter described in the next section.
To deserialize Avro records, you will need to add the Confluent deserializer to the class path of your Deephaven worker (RemoteQueryProcessor) by placing them in /etc/sysconfig/illumon.d/java_lib
. These packages can be obtained from https://packages.confluent.io/maven/. The avro-serializer package is available at:
- https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/5.4.1/kafka-avro-serializer-5.4.1.jar
- https://packages.confluent.io/maven/io/confluent/common-config/5.4.1/common-config-5.4.1.jar
- https://packages.confluent.io/maven/io/confluent/common-utils/5.4.1/common-utils-5.4.1.jar
If your organization also uses the Confluent schema registry, you should additionally add the schema registry package from:
In this example, we will use the pageview and users streams from the Confluent Quick Start using Community Components Guide to produce Avro data.
You can download the schema generated by the Confluent datagen plugin by identifying the schema ID for the latest versions:
curl --silent -X GET http://localhost:8081/subjects/users-value/versions/latest | jq -r '.schema' > /tmp/users.avsc
curl --silent -X GET http://localhost:8081/subjects/pageviews-value/versions/latest | jq -r '.schema' > /tmp/pageviews.avsc
Using the Avro discovery tool, we can generate the equivalent Deephaven schemas:
Groovy
import com.illumon.iris.db.schema.SchemaServiceFactory
import io.deephaven.kafka.ingest.SchemaDiscovery
ad = SchemaDiscovery.avroFactory(new File("/tmp/pageviews.avsc"))
.columnPartition("Date")
.kafkaPartitionColumn("Partition")
.kafkaOffsetColumn("Offset")
.namespace("Kafka")
.tableName("Pageview")
SchemaServiceFactory.getDefault().addSchema(ad.generateDeephavenSchema())
ad = SchemaDiscovery.avroFactory(new File("/tmp/users.avsc"))
.columnPartition("Date")
.kafkaTimestampColumn("Timestamp")
.kafkaPartitionColumn("Partition")
.kafkaOffsetColumn("Offset")
.namespace("Kafka")
.tableName("Users")
SchemaServiceFactory.getDefault().addSchema(ad.generateDeephavenSchema())
We can verify that the schemas have been successfully deployed by using the getMeta()
operation on the new tables.
pvm = db.getTable("Kafka", "Pageview").getMeta()
um = db.getTable("Kafka", "Users").getMeta()
The following script will import the topics into the Kafka.Pageviews
and Kafka.Users
tables.
// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory
// Confluent imports
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false)
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
props.put("schema.registry.url", "http://localhost:8081/")
routingService = DataRoutingServiceFactory.getDefault()
disConfig = routingService.getDataImportServiceConfig("KafkaImport1")
dis = DataImportServer.getDataImportServer(ProcessEnvironment.getDefaultLog(), disConfig, Configuration.getInstance())
dis.startInWorker()
pvrca = new io.deephaven.kafka.ingest.GenericRecordConsumerRecordToTableWriterAdapter.Builder()
.offsetColumnName("Offset")
.kafkaPartitionColumnName("Partition")
.addColumnToValueField("userid", "userid")
.addColumnToValueField("pageid", "pageid")
.addColumnToValueField("viewtime", "viewtime")
.buildFactory()
kipv = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Pageview", "pageviews", pvrca)
kipv.start()
urca = new io.deephaven.kafka.ingest.GenericRecordConsumerRecordToTableWriterAdapter.Builder()
.offsetColumnName("Partition")
.offsetColumnName("Offset")
.timestampColumnName("Timestamp")
.autoValueMapping(true)
.buildFactory()
kiu = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Users", "users", urca)
kiu.start()
// the tables can be viewed from either the DIS worker or other workers
kp = db.i("Kafka", "Pageview").where("Date=currentDateNy()")
ku = db.i("Kafka", "Users").where("Date=currentDateNy()")
POJO Adapter
The io.deephaven.kafka.ingest.PojoConsumerRecordToTableWriterAdapter
converts a Java object to a Deephaven row using reflection and code generation. You must add your objects to the classpath of the Deephaven worker (RemoteQueryProcessor), by adding your jar to the /etc/sysconfig/illumon.d/java_lib
directory.
If you are using Avro Specific records, you must compile your Avro schema into Java class files, as described in the Avro documentation. Using the same Confluent Quick Start example, we can compile the Avro schema into Java classes using the downloaded Avro tools jar:
java -jar avro-tools-1.9.2.jar compile schema /tmp/users.avsc /tmp/pageviews.avsc .
This command creates a Java source file for users and pageviews in the “ksql” directory, which then must be compiled into a jar.
javac -cp /usr/illumon/latest/java_lib/avro-1.9.2.jar:/usr/illumon/latest/java_lib/jackson-core-2.10.2.jar ksql/*.java
jar cf ksql.jar ksql/*.class
sudo cp ksql.jar /etc/sysconfig/illumon.d/java_lib/
After the jar has been copied to the /etc/sysconfig/illumon.d/java_lib
directory, you can then use it from a Groovy script in a worker. In this example, we use the POJO adapter with the Avro generated ksql.pageviews
and ksql.users
classes.
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true)
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
props.put("schema.registry.url", "http://localhost:8081/")
import io.deephaven.kafka.ingest.KafkaIngester
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory
routingService = DataRoutingServiceFactory.getDefault()
disConfig = routingService.getDataImportServiceConfig("KafkaImport1")
dis = DataImportServer.getDataImportServer(ProcessEnvironment.getDefaultLog(), disConfig, Configuration.getInstance())
dis.startInWorker()
pojopv = new io.deephaven.kafka.ingest.PojoConsumerRecordToTableWriterAdapter.Builder()
.offsetColumnName("Offset")
.kafkaPartitionColumnName("Partition")
.valueClass(Class.forName("ksql.pageviews"))
.caseInsensitiveSearch(true)
.buildFactory()
pojou = new io.deephaven.kafka.ingest.PojoConsumerRecordToTableWriterAdapter.Builder()
.kafkaPartitionColumnName("Partition")
.offsetColumnName("Offset")
.timestampColumnName("Timestamp")
.valueClass(Class.forName("ksql.users"))
.caseInsensitiveSearch(true)
.buildFactory()
kipv = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Pageview", "pageviews", pojopv)
kipv.start()
kiu = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Users", "users", pojou)
kiu.start()
Protocol Buffer Adapter
The io.deephaven.kafka.ingest.ProtobufConsumerRecordToTableWriterAdapter
converts a Protocol Buffer message to a Deephaven row (or multiple rows). There are two options for parsing messages: via descriptor set or via function. The descriptor set parsing is limited in its abilities since only simple/flat messages will be properly handled. If you have a jar with classes from the output of ./bin/protoc --java_out=. ${Proto}.proto
, then it may be added to the classpath for message-parsing by adding your jar to the /etc/sysconfig/illumon.d/java_lib
directory.
// Deephaven imports
import io.deephaven.kafka.ingest.KafkaIngester
import io.deephaven.kafka.ingest.ProtobufConsumerRecordToTableWriterAdapter
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.fishlib.configuration.Configuration
import com.illumon.iris.db.util.logging.IrisLogCreator
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory
// the props object is used to configure the Kafka consumer
props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("enable.auto.commit", "false")
props.put("group.id", "dhdis")
// create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault()
disConfig = routingService.getDataImportServiceConfig("KafkaImport1")
if (disConfig == null) {
throw new IllegalArgumentException("Could not find disConfig!")
}
dis = DataImportServer.getDataImportServer(IrisLogCreator.getSingleton(), disConfig, Configuration.getInstance(), db.getSchemaService())
dis.startInWorker()
// 'Person.desc' is generated by `protoc --descriptor_set_out=Person.desc ${proto}`
pbWriter = new ProtobufConsumerRecordToTableWriterAdapter.Builder("Person.desc")
.kafkaPartitionColumnName("Partition")
.kafkaKeyColumnName("Key")
.offsetColumnName("Offset")
.timestampColumnName("Timestamp")
.autoValueMapping(true)
// Note: column-names are generally capitalized, but convention for protobuf is that field-names are camelCase. in
// order for the column called "Email" to be populated be a field called "email", we would need to be case-insensitive
.caseInsensitiveSearch(true)
.buildFactory()
ki = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Person", "person-topic", pbWriter)
ki.start()
// The table is now available from this and other workers
Person = db.i("Kafka", "Person")
.where("Date=currentDateNy()")
If the protoc
-generated class(es) for the message is known, the following could instead be used for the io.deephaven.kafka.ingest.ProtobufConsumerRecordToTableWriterAdapter
builder. The following is based on a message-type of "ContactList", which also includes a repeating-field of "Person";
// These classes come from a jar in /etc/sysconfig/illumon.d/java_lib, and are generated by `protoc --java_out=. ${proto}`.
// Class-compilation of the resultant java is done via `javac`, and the jar is created with `jar cf` against those classes
import io.deephaven.kafka.ingest.proto.Identity
import io.deephaven.kafka.ingest.proto.ContactList
pbWriter = new ProtobufConsumerRecordToTableWriterAdapter.Builder(pBufBytes -> {
// Given the bytes from the kafka stream, we are able to construct a "Contacts" message. This allows us to cast
// to the known type within each of our column-to-value functions
return ContactList.Contacts.parseFrom(pBufBytes)
})
.kafkaPartitionColumnName("Partition")
.offsetColumnName("Offset")
.timestampColumnName("Timestamp")
// We expect "person" to be a repeating-field. For each element within "person", we will add a row. The
// `record.getParallelIdx()` within the column-to-value-functions is incremented for each element
.withParallelValueField("person")
// The value of "source" will be mapped to the "Source" column for each of the iterations
.addColumnToValueField("Source", "source")
// The key from the record will be mapped to the "Key" column for each of the iterations
.addColumnToValueFunction("Key", record -> {
record.getConsumerRecord().key()
})
// Each of the following is also executed for each element with "person". It is up to the builder to index into the
// appropriate field. This is done by indexing into the field with `record.getParallelIdx()`
.addColumnToValueFunction("Name", record -> {
contactList = ((ContactList.Contacts)record.getProtoBuf())
person = contactList.getPerson(record.getParallelIdx())
return person.getName()
})
.addColumnToValueFunction("Email", record -> {
((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getEmail()
})
.addColumnToValueFunction("Age", record -> {
// We are able to use any logic we want within the normalization of each row
age = ((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getAge()
return age <= 0 ? com.illumon.util.QueryConstants.NULL_INT : age
})
.addColumnToValueFunction("Relation", record -> {
// ENUM may be stored as a string-representation
((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getRelation().name()
})
.addColumnToValueFunction("RelationEnum", record -> {
// ENUM may be stored by numeric-value
((ContactList.Contacts)record.getProtoBuf()).getPerson(record.getParallelIdx()).getRelationValue()
})
.buildFactory()
ki = new io.deephaven.kafka.ingest.KafkaIngester(log, dis, props, "Kafka", "Person", "contacts-topic", pbWriter)
ki.start()