Ingesting Kafka data in a Kubernetes installation

This guide demonstrates how to set up an in-worker Data Import Server (DIS) that connects to a Kafka broker and writes incoming data to a Deephaven table for a system deployed on Kubernetes.

Benefits of Kafka with Deephaven on Kubernetes

  • Scalability: Leverage Kubernetes orchestration to scale Kafka ingestion based on workload demands
  • Resilience: Benefit from Kubernetes self-healing capabilities for more reliable data ingestion
  • Portability: Deploy your Kafka ingestion pipelines consistently across different Kubernetes environments
  • Resource Optimization: Configure resource limits and requests for optimized performance

Core+ vs. Legacy Kafka Ingestion

Deephaven provides two frameworks for Kafka ingestion:

  • Core+ Kafka Ingestion: The newer, recommended framework with improved performance and features
  • Legacy Kafka Ingestion: The traditional Enterprise Kafka framework

This guide supports both frameworks, with specific notes about Core+ benefits where relevant. For comprehensive details on Core+ Kafka ingestion, see Core+ Kafka documentation.

Prerequisites

  • Access to a Kubernetes cluster with Deephaven deployed
  • Familiarity with Kubernetes and the kubectl command
  • Existing Kafka topic(s) accessible from your Kubernetes cluster

The examples in this guide use a namespace of deephaven - adjust accordingly for your environment.

Architecture

In this architecture:

  1. A Persistent Query runs inside a worker pod in the Kubernetes cluster
  2. The Persistent Query initializes a Data Import Server (DIS) process
  3. The DIS connects to external Kafka broker(s) and consumes topic data
  4. Data is stored in a Persistent Volume Claim (PVC) for durability
  5. The Table Data Cache Proxy (TDCP) routes queries to the appropriate data sources

Configure a new Data Import Server

Open a management shell terminal

Open a terminal session on management-shell pod.

$ kubectl -n deephaven exec -it deploy/management-shell -- /bin/bash

Create a new DIS configuration

Next, use the dhconfig dis tool to add a new data import server named iwd-kafka1 with a claim for all tables in the Kafka namespace with the below command. This configures Deephaven's data routing without requiring any manual config file changes. The --name parameter value is arbitrary, but make a note of it. It will be needed when we create the persistent query script in a Deephaven console later.

root@management-shell-1a2b3c4d5e-vwxyz:/$ /usr/illumon/latest/bin/dhconfig dis add --name iwd-kafka1 --claim Kafka

Restart the Table Data Cache Proxy (TDCP)

After the routing file changes, the tdcp process needs a restart to pick up the changes. Note that this is not done within a management-shell terminal. We will perform a scale down and scale up of the tdcp deployment to do this with these commands.

# Scale the tdcp deployment down and wait a moment for the pod to terminate:
$ kubectl -n deephaven scale deploy/tdcp --replicas 0

# Scale the tdcp deployment back up
$ kubectl -n deephaven scale deploy/tdcp --replicas 1

Create the in-worker DIS as a Persistent Query

Create a schema

If you do not have a table to consume your Kafka data, you will need to create a schema for the Deephaven table. In a Kubernetes environment, you'll typically create this schema through one of the following methods:

Option 1: Create schema using Code Studio

  1. Launch a Deephaven Code Studio session from the Kubernetes deployment
  2. Create a script that defines your schema in the appropriate namespace
  3. Run the script to register your schema

Here's an example schema creation script:

import io.deephaven.engine.table.Table
import io.deephaven.engine.table.columndefinition.schema.TableSchema
import io.deephaven.engine.table.columndefinition.schema.TableSchemaBuilder
import io.deephaven.engine.table.columndefinition.ColumnDefinition
import java.time.Instant

// Define the schema for the Kafka table
TableSchema schema = new TableSchemaBuilder()
    .withColumnDefinition(ColumnDefinition.ofString("Key"))
    .withColumnDefinition(ColumnDefinition.ofString("Value"))
    .withColumnDefinition(ColumnDefinition.ofInstant("Timestamp"))
    .withColumnDefinition(ColumnDefinition.ofLong("Offset"))
    .build()

// Register the schema in the Kafka namespace
Table.saveSchema("Kafka", "IWD_Test", schema)

Option 2: Create schema via kubectl and management-shell

You can also create a schema by running a script in the management-shell:

  1. Connect to the management-shell pod:

    $ kubectl -n deephaven exec -it deploy/management-shell -- /bin/bash
    
  2. Create a file containing your schema definition script

  3. Run the script using the Deephaven execution tool:

    $ /usr/illumon/latest/bin/illumonrun -f /path/to/your/schema_script.groovy
    

For more details about schema creation options, see define a schema in the general Kafka documentation.

Create a Persistent Query

Use the Deephaven web console and create a Persistent Query.

Enter the settings

Under the Settings tab, click Show Advanced and fill in the Persistent Volume Claim, Storage Class, Storage Size, and Mount Path fields. If the stated persistent volume claim (pvc) does not exist, one is created with the storage class and size specified. In that case, the storage class you specify must be one that has a dynamic volume provisioner associated with it so a persistent volume is also created. If you choose to use a pre-existing pvc, you do not need to specify the storage class or storage size.

img

Write the script

Click the Script tab and enter a script to create the data import server that connects to your Kafka broker. A simple example script is shown below, and there are further examples and more detailed information on consuming Kafka data for Core+ workers and Legacy workers.

import io.deephaven.kafka.KafkaTools
import io.deephaven.enterprise.kafkawriter.KafkaTableWriter
import io.deephaven.enterprise.dataimportserver.DataImportServerTools

kafkaServer="kafka-broker-example.com"
kafkaTopicName="quickstart"         // Your Kafka topic
kafkaDisName="iwd-kafka1"           // The storage 'name' value you added to the routing.yml file
targetNamespace="Kafka"             // The target namespace added to the 'claims' section of the routing.yml file
targetTable="IWD_Test"              // The target table for which you created a schema

// Set Kafka properties
final Properties props = new Properties()

// Connection settings
props.put("bootstrap.servers", kafkaServer + ":9092")
props.put("group.id", "dhdis-k8s")

// Performance tuning
props.put("fetch.min.bytes", "65000")          // Minimum amount of data to fetch in a single request
props.put("fetch.max.wait.ms", "200")         // Maximum time to wait before returning data
props.put("fetch.max.bytes", "52428800")      // Maximum bytes to fetch per partition (50MB)
props.put("max.partition.fetch.bytes", "1048576") // Maximum bytes per partition (1MB)
props.put("max.poll.records", "500")         // Maximum number of records returned in a single poll

// Reliability settings
props.put("enable.auto.commit", "false")      // Let Deephaven manage offsets
props.put("auto.offset.reset", "earliest")    // Start from earliest available offset if no committed offset

// Security settings (uncomment and configure as needed)
// props.put("security.protocol", "SSL")
// props.put("ssl.truststore.location", "/path/to/truststore.jks")
// props.put("ssl.truststore.password", "${KAFKA_TRUSTSTORE_PASSWORD}")

// Deephaven specific settings
props.put("deephaven.offset.column.name", "Offset")
props.put("deephaven.timestamp.column.name", "Timestamp")

// Create DIS with previously configured name and storage path.
// The path must match the 'Mount Path' value entered in the Settings tab.
dis = DataImportServerTools.getDisByNameWithStorage(kafkaDisName, "/dataImportServers/iwd-kafka1")

final KafkaTableWriter.Options opts = new io.deephaven.enterprise.kafkawriter.KafkaTableWriter.Options()
opts.dataImportServer(dis)
opts.tableName(targetTable).namespace(targetNamespace)
opts.topic(kafkaTopicName)
opts.kafkaProperties(props)
opts.keySpec(io.deephaven.kafka.KafkaTools.Consume.simpleSpec("Key", String.class))
opts.valueSpec(io.deephaven.kafka.KafkaTools.Consume.simpleSpec("Value", String.class))

// Configure fixed partitioning
opts.partitionValue(today())

KafkaTableWriter.consumeToDis(opts)

Advanced Configuration Options

Partitioning Strategies

The example script uses fixed partitioning with opts.partitionValue(today()), which is simple but may not be optimal for all scenarios. Consider these alternative approaches:

Fixed Partitioning

Fixed partitioning assigns a single partition for the life of the ingester:

// Using a date as partition
opts.partitionValue(today())

// Using a string as partition
opts.partitionValue("static_partition")

Dynamic Partitioning

Dynamic partitioning determines partitions as a function of the data, useful for time-series data:

import io.deephaven.enterprise.kafkawriter.TimePartitionRotation
import java.time.ZoneId

// Partition by day using the KafkaTimestamp column
opts.dynamicPartitionFunction(
    "KafkaTimestamp",
    TimePartitionRotation.daily(ZoneId.of("UTC"), 7 * 24) // Keep 7 days worth of partitions
)

For more details on partitioning strategies, see the Core+ Kafka documentation.

Security Considerations

Kafka Authentication and Encryption

When connecting to a secured Kafka cluster, you'll need to configure appropriate security settings:

// For SSL/TLS
props.put("security.protocol", "SSL")
props.put("ssl.truststore.location", "/path/to/truststore.jks")
props.put("ssl.truststore.password", "${KAFKA_TRUSTSTORE_PASSWORD}")

// For SASL authentication (e.g., PLAIN, SCRAM, GSSAPI)
props.put("security.protocol", "SASL_SSL")
props.put("sasl.mechanism", "PLAIN")
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";")

Kubernetes Secrets

Instead of hardcoding credentials, use Kubernetes secrets:

  1. Create a secret containing Kafka credentials:

    kubectl create secret generic kafka-credentials \
      --from-literal=username=user \
      --from-literal=password=password \
      -n deephaven
    
  2. Mount the secret to your Persistent Query pod and reference environment variables in your script.

Monitoring Kafka Ingestion

Monitoring Kafka ingestion in Kubernetes environments is important for ensuring reliability. Here are several approaches:

1. Deephaven Query Monitor

Use the Deephaven Query Monitor to check the status of your Persistent Query and view any error messages.

2. Kubernetes-native Monitoring

  • Pod Metrics: Monitor CPU, memory usage of the worker pod running your ingestion
  • Pod Logs: Check logs for ingestion-related messages
    kubectl logs -f deploy/merge-worker -n deephaven
    
  • Pod Events: Watch for pod-related events
    kubectl get events -n deephaven --field-selector involvedObject.name=merge-worker-xxxx
    

3. Prometheus & Grafana

If your Kubernetes cluster has Prometheus and Grafana installed, create dashboards for:

  • JVM metrics from your Deephaven worker pods
  • Kafka consumer lag metrics
  • PVC storage utilization

Troubleshooting

Common Issues and Solutions

Ingester Initialization Failures

If the ingester Persistent Query fails to initialize, view the error in the Summary tab of the Persistent Query details. Common causes include:

  1. Schema Mismatch: Missing or wrong-type columns between ingester configuration and table schema

    • Solution: Ensure your schema matches the data format from Kafka
  2. Kafka Connectivity Issues:

    • Solution: Verify the Kafka broker is accessible from the Kubernetes cluster and check network policies
  3. Resource Constraints: Pod out of memory or CPU limits exceeded

    • Solution: Increase the resource allocation for your Persistent Query pod
  4. Storage Issues: PVC mounting problems or insufficient storage

    • Solution: Check PVC status and storage class compatibility

Schema Evolution Problems

When schema changes cause ingestion failures because previously written data doesn't match the new schema:

Option 1: Attach PVC to Code Studio
  1. Launch a new Code Studio, select a merge worker, and click Show Advanced
  2. Enter the name of the PVC used for the Kafka ingester, along with storage class name, mount point, and size
  3. Launch the Code Studio
  4. Delete non-matching paths using Python/Groovy:
    import com.illumon.util.files.FileHelper
    FileHelper.deleteRecursivelyOnNFS(new File("/dataImportServers/iwd-kafka1/path/to/table"))
    
Option 2: Delete and Recreate PVC
  1. Identify the PVC:

    kubectl get pvc kafka-iwd-pvc --namespace deephaven
    
  2. Delete the PVC and its associated PV:

    kubectl delete pvc kafka-iwd-pvc --namespace deephaven
    kubectl delete pv pvc-0255bf9a-28c8-4fa4-a0aa-de23f05834e0
    
  3. Restart the Persistent Query to create a new PVC

Debugging Connection Issues

To verify Kafka connectivity from within the Kubernetes cluster:

# Deploy a test pod with Kafka tools
kubectl run kafka-debug --image=confluentinc/cp-kafka:6.1.1 -it --rm --namespace deephaven -- bash

# Test connection to Kafka broker
kafka-broker-list.sh bootstrap-servers=kafka-broker-example.com:9092 describe

Advanced Troubleshooting

For more complex issues, consider these approaches:

  1. Enable Verbose Logging: Add logging properties to your Kafka consumer configuration

    props.put("deephaven.log.level", "DEBUG")
    
  2. Analyze Pod State: Generate a diagnostics dump for deeper inspection

    kubectl exec -it deploy/merge-worker -n deephaven -- jcmd 1 GC.heap_dump /tmp/heap.hprof
    kubectl cp deephaven/merge-worker:/tmp/heap.hprof ./heap.hprof
    
  3. Consult Support: For persistent issues, contact Deephaven support with:

    • PQ logs
    • Kafka topic metadata
    • Kubernetes cluster information