Ingesting Kafka data in a Kubernetes installation
This guide will demonstrate setting up an in-worker Data Import Server (DIS) that connects to a Kafka broker and writes incoming data to a Deephaven table for a system deployed on Kubernetes. Note that some of the steps will differ from those shown here to accomplish the same thing on a bare metal installation.
This guide assumes that you have access to your cluster configured, and that you are familiar with Kubernetes and the
kubectl command. The examples here use a namespace of deephaven as an example, adjust accordingly for your environment.
Configure a new Data Import Server
Open a management shell terminal
Open a terminal session on management-shell pod.
Create a new DIS configuration
Next, use the dhconfig dis tool to add a new data import
server named iwd-kafka1 with a claim for all tables in the Kafka namespace
with the below command. This configures Deephaven's data routing without requiring
any manual config file changes. The --name parameter value is arbitrary, but make a note of it. It will be needed when we
create the persistent query script in a Deephaven console later.
Restart the Table Data Cache Proxy (TDCP)
After the routing file changes, the tdcp process needs a restart to pick up the changes. Note that this is not done within
a management-shell terminal. We will perform a scale down and scale up of the tdcp deployment to do this with these commands.
Create the in-worker DIS as a Persistent Query
Create a schema
If you do not have a table to consume your Kafka data, you will need to define a schema for the Deephaven table.
Create a Persistent Query
Use the deephaven web console and create a persistent query.
Enter the settings
Under the Settings tab, click Show Advanced and fill in the Persistent Volume Claim, Storage Class, Storage Size, and
Mount Path fields. If the stated persistent volume claim (pvc) does not exist, one is created with the storage class and
size specified. In that case, the storage class you specify must be one that has a dynamic volume provisioner associated with
it so a persistent volume is also created. If you choose to use a pre-existing pvc, you do not need to specify
the storage class or storage size.

Write the script
Click the Script tab and enter a script to create the data import server that connects to your Kafka broker. A simple example script is shown below, and there are further examples and more detailed information on consuming Kafka data for Core+ workers and Legacy workers.
Troubleshooting
If the ingester Persistent Query fails to initialize, the main error, which can be viewed in the Summary tab of the Persistent Query details, generally has information about what caused the failure. A common failure cause is a mismatch, such as a missing or wrong-type column, between the ingester configuration and the schema of the target table.
In most cases, once the errors are corrected, the ingester can be restarted to retry with the new configuration. However, if the schema is updated, and some data has already been written during a previous execution of the ingester using the previous schema, this can cause subsequent attempts to fail because the already-written data does not match the new schema.
Two options to address this situation are:
- Attach the ingester PVC to a Code Studio pod:
- When launching a new Code Studio, select a merge worker, and click
Show Advanced - Enter the name of the PVC used for the Kafka ingester Persistent Query in the
Persistent Volume Claimfield. Also enter the storage class name, the mount point, and the size, the same as what are specified for the Persistent Query. - Launch the Code Studio.
- Python or Groovy statements can be used from within the Code Studio to delete a non-matching destination path, such as this for use from a Legacy Groovy worker; or, alternately,
kubectlcan connect to the Pod running the worker and Linuxrmcan delete the non-matching path(s).
- Use
kubectlto find and delete the PVC and PV that store the data; they are recreated the next time the Persistent Query is started.
In this example, delete the PVC (kafka-iwd-pvc) and the PV (pvc-0255bf9a-28c8-4fa4-a0aa-de23f05834e0).