---
title: Streaming Kafka
---

This section of the Crash Course guides you through ingesting a single [Kafka](https://kafka.apache.org/) topic into the Deephaven system as [intraday data](../../legacy/importing-data/introduction.md#intraday-and-historical-data). This process leverages an in-worker [Data Import Server (DIS)](../../data-guide/dis.md) and makes the data accessible to other users via `db.live_table`. It also provides a foundation for ingesting multiple Kafka topics into Deephaven.

## Prerequisites

This guide assumes you have access to a Kafka cluster or a testing environment where Kafka can be deployed. Kafka administration is outside the scope of this document.

## Administrative Configuration

Before starting, a system administrator must:

1. Add the DIS to the system configuration. Here, we show a simplified DIS configuration using claims; more advanced routing configuration is outside the scope of this guide. This example illustrates adding a DIS named `KafkaDis1` and claiming the table `ExampleNamespace.ExampleTableName`:

   ```bash
   sudo -u irisadmin /usr/illumon/latest/bin/dhconfig dis add --name KafkaDis1 --claim ExampleNamespace.ExampleTableName
   ```

2. Create a data storage location (`dbRoot`) for the DIS on the relevant Merge Server. By convention, this is typically created under `/db/dataImportServers/<disName>`, but other locations can be used. While multiple DISes can technically share a `dbRoot` directory, it is recommended that each DIS be assigned its own dedicated directory. For merge jobs to function correctly, ensure the `dbRoot` directory is owned by the `dbmerge` user and `dbmergegrp` group.

   ```bash
   sudo mkdir /db/dataImportServers/KafkaDis1
   sudo chown dbmerge.dbmergegrp /db/dataImportServers/KafkaDis1
   ```

## Persistent Query Configuration

Only users belonging to the `iris-datamanagers`, `iris-dataimporters`, or `iris-superusers` groups (see [special groups](../../sys-admin/permissions/acls.md#special-groups) for details) can create the in-worker DIS as a Persistent Query (PQ) within Deephaven.

Create a new PQ in the web UI with the following settings:

- **Name**: By convention, use the name of your DIS configuration, although any name is technically acceptable.
- **Type**: "Live Query - Merge Server"
- **DB Server**: Specify the Merge Server where the `dbRoot` directory was created.

![kafka-dis-pq-settings.png](../../assets/crash-course/data-in/kafka-dis-pq-settings.png)

Next, create the script. The following code snippet outlines the basic configuration to launch an in-worker Kafka DIS.

```groovy
import io.deephaven.enterprise.kafkawriter.KafkaTableWriter
import io.deephaven.enterprise.kafkawriter.SchemaHelper
import io.deephaven.enterprise.kafkawriter.TimePartitionRotation
import io.deephaven.kafka.KafkaTools.Consume
import io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec

final String disName = 'KafkaDis1'
final String namespace = 'ExampleNamespace'
final String tableName = 'ExampleTableName'
final String dbRoot = "/db/dataImportServers/${disName}"

final String bootstrapServers = 'kafka-1:9092,kafka-2:9092,kafka-3:9092'
final String topic = 'ExampleTopic'
final KeyOrValueSpec keySpec = Consume.IGNORE
final KeyOrValueSpec valueSpec = Consume.simpleSpec('Value', String.class)

final Properties kafkaProperties = new Properties()
kafkaProperties.put('bootstrap.servers', bootstrapServers)
kafkaProperties.put('group.id', disName)

final KafkaTableWriter.Options opts = new KafkaTableWriter.Options()
opts.dynamicPartitionFunction('KafkaTimestamp', TimePartitionRotation.daily())
opts.disNameWithStorage(disName, dbRoot)
opts.namespace(namespace)
opts.tableName(tableName)
opts.topic(topic)
opts.keySpec(keySpec)
opts.valueSpec(valueSpec)
opts.kafkaProperties(kafkaProperties)

new SchemaHelper(opts).addOrValidateSchema()

KafkaTableWriter.consumeToDis(opts)
```

Crucially, the `keySpec` and `valueSpec` parameters must be defined. These parameters describe the format of the Kafka topic's key and value data. In this example, we've ignored the topic's key and will parse the value data directly as a simple string, assigning it to a column named "Value". Deephaven has built-in support for [JSON](https://www.json.org/), [Avro](https://avro.apache.org/), and [Protobuf](https://protobuf.dev/) data from Kafka, as well as support for custom formats.

The code also includes an optional `SchemaHelper`, which serves two purposes:

1. Add Schema: If no schema exists for `ExampleNamespace.ExampleTable`, the derived schema will be added to Deephaven.
2. Schema Validation: If a schema exists for `ExampleNamespace.ExampleTable`, the derived schema will be checked for compatibility against the existing schema.

If the `SchemaHelper` is not used, the [schema](../../data-guide/tables-and-schemas.md) must already have been created.

Next, configure the scheduling.
By default, PQs are scheduled to run with the Schedule Type "Daily", which is often inappropriate for Kafka DISes.
To enable continuous data ingestion from the Kafka topic, change the **Schedule Type** to "Continuous" and set an initialization timeout.

![kafka-dis-pq-scheduling.png](../../assets/crash-course/data-in/kafka-dis-pq-scheduling.png)

For more advanced PQ configuration, refer to the Deephaven documentation [creating Persistent Queries](../../query-management/ui-queries.md).

## Further considerations

The optimal setup for in-worker Kafka DISes varies significantly based on data volume and operational requirements.

### Dedicated DIS

A Dedicated in-worker Kafka DIS refers to a configuration where a single DIS instance is solely responsible for consuming data from one specific Kafka topic.
This approach offers a couple of advantages:

- **Ideal for High-Volume Topics**: Particularly suitable for consuming high-volume Kafka topics.
- **Enhanced Resource Isolation**: Provides better resource isolation. Restarting a dedicated DIS will only affect the associated topic and its corresponding live table, minimizing disruption to other data streams.

You can enhance performance for extremely high-volume Kafka topics by configuring a single dedicated DIS to consume data in parallel from multiple partitions of the same topic. This is achieved by utilizing the `partitionFilter` option within `KafkaTableWriter.Options` to specify the desired partition assignments for each `consumeToDis` call.

### Shared DIS

A Shared in-worker Kafka DIS refers to a configuration where a single DIS instance is responsible for consuming data from multiple Kafka topics. This approach can be considered for scenarios involving:

- A large number of low-volume Kafka topics.
- A moderate number of medium-volume Kafka topics.
- A small number of high-volume Kafka topics.

This is achieved by including configurations for multiple Kafka topics within the PQ script and calling `consumeToDis` for each topic within that script.

## Related documentation

- [Ingesting Kafka data](../../data-guide/streaming/coreplus-kafka.md)
- [Persistent Queries](../../query-management/ui-queries.md)
- [Data Import Server (DIS)](../../data-guide/dis.md)
