Ingesting Kafka data in a Kubernetes installation
This guide demonstrates how to set up an in-worker Data Import Server (DIS) that connects to a Kafka broker and writes incoming data to a Deephaven table for a system deployed on Kubernetes.
Benefits of Kafka with Deephaven on Kubernetes
- Scalability: Leverage Kubernetes orchestration to scale Kafka ingestion based on workload demands
- Resilience: Benefit from Kubernetes self-healing capabilities for more reliable data ingestion
- Portability: Deploy your Kafka ingestion pipelines consistently across different Kubernetes environments
- Resource Optimization: Configure resource limits and requests for optimized performance
Core+ vs. Legacy Kafka Ingestion
Deephaven provides two frameworks for Kafka ingestion:
- Core+ Kafka Ingestion: The newer, recommended framework with improved performance and features
- Legacy Kafka Ingestion: The traditional Enterprise Kafka framework
This guide supports both frameworks, with specific notes about Core+ benefits where relevant. For comprehensive details on Core+ Kafka ingestion, see Core+ Kafka documentation.
Prerequisites
- Access to a Kubernetes cluster with Deephaven deployed
- Familiarity with Kubernetes and the
kubectl
command - Existing Kafka topic(s) accessible from your Kubernetes cluster
The examples in this guide use a namespace of deephaven
- adjust accordingly for your environment.
Architecture
In this architecture:
- A Persistent Query runs inside a worker pod in the Kubernetes cluster
- The Persistent Query initializes a Data Import Server (DIS) process
- The DIS connects to external Kafka broker(s) and consumes topic data
- Data is stored in a Persistent Volume Claim (PVC) for durability
- The Table Data Cache Proxy (TDCP) routes queries to the appropriate data sources
Configure a new Data Import Server
Open a management shell terminal
Open a terminal session on management-shell pod.
$ kubectl -n deephaven exec -it deploy/management-shell -- /bin/bash
Create a new DIS configuration
Next, use the dhconfig dis tool to add a new data import
server named iwd-kafka1
with a claim for all tables in the Kafka
namespace
with the below command. This configures Deephaven's data routing without requiring
any manual config file changes. The --name
parameter value is arbitrary, but make a note of it. It will be needed when we
create the persistent query script in a Deephaven console later.
root@management-shell-1a2b3c4d5e-vwxyz:/$ /usr/illumon/latest/bin/dhconfig dis add --name iwd-kafka1 --claim Kafka
Restart the Table Data Cache Proxy (TDCP)
After the routing file changes, the tdcp
process needs a restart to pick up the changes. Note that this is not done within
a management-shell terminal. We will perform a scale down and scale up of the tdcp
deployment to do this with these commands.
# Scale the tdcp deployment down and wait a moment for the pod to terminate:
$ kubectl -n deephaven scale deploy/tdcp --replicas 0
# Scale the tdcp deployment back up
$ kubectl -n deephaven scale deploy/tdcp --replicas 1
Create the in-worker DIS as a Persistent Query
Create a schema
If you do not have a table to consume your Kafka data, you will need to create a schema for the Deephaven table. In a Kubernetes environment, you'll typically create this schema through one of the following methods:
Option 1: Create schema using Code Studio
- Launch a Deephaven Code Studio session from the Kubernetes deployment
- Create a script that defines your schema in the appropriate namespace
- Run the script to register your schema
Here's an example schema creation script:
import io.deephaven.engine.table.Table
import io.deephaven.engine.table.columndefinition.schema.TableSchema
import io.deephaven.engine.table.columndefinition.schema.TableSchemaBuilder
import io.deephaven.engine.table.columndefinition.ColumnDefinition
import java.time.Instant
// Define the schema for the Kafka table
TableSchema schema = new TableSchemaBuilder()
.withColumnDefinition(ColumnDefinition.ofString("Key"))
.withColumnDefinition(ColumnDefinition.ofString("Value"))
.withColumnDefinition(ColumnDefinition.ofInstant("Timestamp"))
.withColumnDefinition(ColumnDefinition.ofLong("Offset"))
.build()
// Register the schema in the Kafka namespace
Table.saveSchema("Kafka", "IWD_Test", schema)
Option 2: Create schema via kubectl and management-shell
You can also create a schema by running a script in the management-shell:
-
Connect to the management-shell pod:
$ kubectl -n deephaven exec -it deploy/management-shell -- /bin/bash
-
Create a file containing your schema definition script
-
Run the script using the Deephaven execution tool:
$ /usr/illumon/latest/bin/illumonrun -f /path/to/your/schema_script.groovy
For more details about schema creation options, see define a schema in the general Kafka documentation.
Create a Persistent Query
Use the Deephaven web console and create a Persistent Query.
Enter the settings
Under the Settings tab, click Show Advanced
and fill in the Persistent Volume Claim, Storage Class, Storage Size, and
Mount Path fields. If the stated persistent volume claim (pvc) does not exist, one is created with the storage class and
size specified. In that case, the storage class you specify must be one that has a dynamic volume provisioner associated with
it so a persistent volume is also created. If you choose to use a pre-existing pvc, you do not need to specify
the storage class or storage size.
Write the script
Click the Script tab and enter a script to create the data import server that connects to your Kafka broker. A simple example script is shown below, and there are further examples and more detailed information on consuming Kafka data for Core+ workers and Legacy workers.
import io.deephaven.kafka.KafkaTools
import io.deephaven.enterprise.kafkawriter.KafkaTableWriter
import io.deephaven.enterprise.dataimportserver.DataImportServerTools
kafkaServer="kafka-broker-example.com"
kafkaTopicName="quickstart" // Your Kafka topic
kafkaDisName="iwd-kafka1" // The storage 'name' value you added to the routing.yml file
targetNamespace="Kafka" // The target namespace added to the 'claims' section of the routing.yml file
targetTable="IWD_Test" // The target table for which you created a schema
// Set Kafka properties
final Properties props = new Properties()
// Connection settings
props.put("bootstrap.servers", kafkaServer + ":9092")
props.put("group.id", "dhdis-k8s")
// Performance tuning
props.put("fetch.min.bytes", "65000") // Minimum amount of data to fetch in a single request
props.put("fetch.max.wait.ms", "200") // Maximum time to wait before returning data
props.put("fetch.max.bytes", "52428800") // Maximum bytes to fetch per partition (50MB)
props.put("max.partition.fetch.bytes", "1048576") // Maximum bytes per partition (1MB)
props.put("max.poll.records", "500") // Maximum number of records returned in a single poll
// Reliability settings
props.put("enable.auto.commit", "false") // Let Deephaven manage offsets
props.put("auto.offset.reset", "earliest") // Start from earliest available offset if no committed offset
// Security settings (uncomment and configure as needed)
// props.put("security.protocol", "SSL")
// props.put("ssl.truststore.location", "/path/to/truststore.jks")
// props.put("ssl.truststore.password", "${KAFKA_TRUSTSTORE_PASSWORD}")
// Deephaven specific settings
props.put("deephaven.offset.column.name", "Offset")
props.put("deephaven.timestamp.column.name", "Timestamp")
// Create DIS with previously configured name and storage path.
// The path must match the 'Mount Path' value entered in the Settings tab.
dis = DataImportServerTools.getDisByNameWithStorage(kafkaDisName, "/dataImportServers/iwd-kafka1")
final KafkaTableWriter.Options opts = new io.deephaven.enterprise.kafkawriter.KafkaTableWriter.Options()
opts.dataImportServer(dis)
opts.tableName(targetTable).namespace(targetNamespace)
opts.topic(kafkaTopicName)
opts.kafkaProperties(props)
opts.keySpec(io.deephaven.kafka.KafkaTools.Consume.simpleSpec("Key", String.class))
opts.valueSpec(io.deephaven.kafka.KafkaTools.Consume.simpleSpec("Value", String.class))
// Configure fixed partitioning
opts.partitionValue(today())
KafkaTableWriter.consumeToDis(opts)
Advanced Configuration Options
Partitioning Strategies
The example script uses fixed partitioning with opts.partitionValue(today())
, which is simple but may not be optimal for all scenarios. Consider these alternative approaches:
Fixed Partitioning
Fixed partitioning assigns a single partition for the life of the ingester:
// Using a date as partition
opts.partitionValue(today())
// Using a string as partition
opts.partitionValue("static_partition")
Dynamic Partitioning
Dynamic partitioning determines partitions as a function of the data, useful for time-series data:
import io.deephaven.enterprise.kafkawriter.TimePartitionRotation
import java.time.ZoneId
// Partition by day using the KafkaTimestamp column
opts.dynamicPartitionFunction(
"KafkaTimestamp",
TimePartitionRotation.daily(ZoneId.of("UTC"), 7 * 24) // Keep 7 days worth of partitions
)
For more details on partitioning strategies, see the Core+ Kafka documentation.
Security Considerations
Kafka Authentication and Encryption
When connecting to a secured Kafka cluster, you'll need to configure appropriate security settings:
// For SSL/TLS
props.put("security.protocol", "SSL")
props.put("ssl.truststore.location", "/path/to/truststore.jks")
props.put("ssl.truststore.password", "${KAFKA_TRUSTSTORE_PASSWORD}")
// For SASL authentication (e.g., PLAIN, SCRAM, GSSAPI)
props.put("security.protocol", "SASL_SSL")
props.put("sasl.mechanism", "PLAIN")
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";")
Kubernetes Secrets
Instead of hardcoding credentials, use Kubernetes secrets:
-
Create a secret containing Kafka credentials:
kubectl create secret generic kafka-credentials \ --from-literal=username=user \ --from-literal=password=password \ -n deephaven
-
Mount the secret to your Persistent Query pod and reference environment variables in your script.
Monitoring Kafka Ingestion
Monitoring Kafka ingestion in Kubernetes environments is important for ensuring reliability. Here are several approaches:
1. Deephaven Query Monitor
Use the Deephaven Query Monitor to check the status of your Persistent Query and view any error messages.
2. Kubernetes-native Monitoring
- Pod Metrics: Monitor CPU, memory usage of the worker pod running your ingestion
- Pod Logs: Check logs for ingestion-related messages
kubectl logs -f deploy/merge-worker -n deephaven
- Pod Events: Watch for pod-related events
kubectl get events -n deephaven --field-selector involvedObject.name=merge-worker-xxxx
3. Prometheus & Grafana
If your Kubernetes cluster has Prometheus and Grafana installed, create dashboards for:
- JVM metrics from your Deephaven worker pods
- Kafka consumer lag metrics
- PVC storage utilization
Troubleshooting
Common Issues and Solutions
Ingester Initialization Failures
If the ingester Persistent Query fails to initialize, view the error in the Summary tab of the Persistent Query details. Common causes include:
-
Schema Mismatch: Missing or wrong-type columns between ingester configuration and table schema
- Solution: Ensure your schema matches the data format from Kafka
-
Kafka Connectivity Issues:
- Solution: Verify the Kafka broker is accessible from the Kubernetes cluster and check network policies
-
Resource Constraints: Pod out of memory or CPU limits exceeded
- Solution: Increase the resource allocation for your Persistent Query pod
-
Storage Issues: PVC mounting problems or insufficient storage
- Solution: Check PVC status and storage class compatibility
Schema Evolution Problems
When schema changes cause ingestion failures because previously written data doesn't match the new schema:
Option 1: Attach PVC to Code Studio
- Launch a new Code Studio, select a merge worker, and click
Show Advanced
- Enter the name of the PVC used for the Kafka ingester, along with storage class name, mount point, and size
- Launch the Code Studio
- Delete non-matching paths using Python/Groovy:
import com.illumon.util.files.FileHelper FileHelper.deleteRecursivelyOnNFS(new File("/dataImportServers/iwd-kafka1/path/to/table"))
Option 2: Delete and Recreate PVC
-
Identify the PVC:
kubectl get pvc kafka-iwd-pvc --namespace deephaven
-
Delete the PVC and its associated PV:
kubectl delete pvc kafka-iwd-pvc --namespace deephaven kubectl delete pv pvc-0255bf9a-28c8-4fa4-a0aa-de23f05834e0
-
Restart the Persistent Query to create a new PVC
Debugging Connection Issues
To verify Kafka connectivity from within the Kubernetes cluster:
# Deploy a test pod with Kafka tools
kubectl run kafka-debug --image=confluentinc/cp-kafka:6.1.1 -it --rm --namespace deephaven -- bash
# Test connection to Kafka broker
kafka-broker-list.sh bootstrap-servers=kafka-broker-example.com:9092 describe
Advanced Troubleshooting
For more complex issues, consider these approaches:
-
Enable Verbose Logging: Add logging properties to your Kafka consumer configuration
props.put("deephaven.log.level", "DEBUG")
-
Analyze Pod State: Generate a diagnostics dump for deeper inspection
kubectl exec -it deploy/merge-worker -n deephaven -- jcmd 1 GC.heap_dump /tmp/heap.hprof kubectl cp deephaven/merge-worker:/tmp/heap.hprof ./heap.hprof
-
Consult Support: For persistent issues, contact Deephaven support with:
- PQ logs
- Kafka topic metadata
- Kubernetes cluster information