Write your own custom parser for Kafka
Kafka topics often contain data that does not fit neatly into Deephaven's built-in formats such as simple, JSON, Avro, or Protobuf. In these cases, you can write your own parser or use an object processor that converts raw bytes from Kafka into rich objects and table columns.
This guide shows how to:
- Understand when you need a custom parser.
- Consume raw bytes or structured data from Kafka into a Deephaven table.
- Apply custom parsing logic to build a domain object.
- Project that object into regular Deephaven columns.
Note
If you are new to Kafka in Deephaven, read Connect to a Kafka stream and Kafka basic terminology first.
When to use a custom parser
Built-in Kafka specs such as simpleSpec, jsonSpec, avroSpec, and protobufSpec cover the most common patterns.
A custom parser is useful when:
- The payload is a non-standard encoding.
- The payload structure changes frequently but maps to a stable internal model.
- You need complex validation or transformation during parsing.
- You want to parse into a domain object and then derive multiple columns from it.
In this guide, you will:
- Define a domain class for your records.
- Use JSON tools to map Kafka values into that class.
- Expose the parsed object fields as columns in a Deephaven table.
Prerequisites
- Kafka is running with a topic you can read from.
- Deephaven Groovy server is running with access to that Kafka cluster.
- You are comfortable with basic Groovy and classes.
- You understand the basics of Kafka in Deephaven.
Step 1: Define a domain class
Start by defining a Groovy class that represents the logical payload you want to work with.
class Person {
int age
String name
Person(int age, String name) {
this.age = age
this.name = name
}
}
This example assumes each Kafka value contains an age and name field.
Step 2: Describe the payload with column definitions
Next, you define column definitions that describe the columns you want in the Deephaven table, and a mapping from JSON field names to those column names.
import io.deephaven.engine.table.ColumnDefinition
ageDef = ColumnDefinition.ofInt('Age')
nameDef = ColumnDefinition.ofString('Name')
ColumnDefinition[] colDefs = [ageDef, nameDef]
mapping = ['age': 'Age', 'name': 'Name']
colDefsdescribes the Deephaven columns you want.mappingexplains how JSON field names map onto those columns.
Step 3: Create a JSON spec and consume the topic
You can now build a JSON spec using KafkaTools.Consume.jsonSpec and pass it to consumeToTable.
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.kafka.KafkaTools
// Define column definitions for JSON deserialization
ageDef = ColumnDefinition.ofInt('Age')
nameDef = ColumnDefinition.ofString('Name')
// Create column definitions array
ColumnDefinition[] colDefs = [ageDef, nameDef]
// Create mapping from JSON field names to column names
mapping = ['age': 'Age', 'name': 'Name']
// Set up Kafka properties
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
kafkaProps.put('schema.registry.url', 'http://redpanda:8081')
// Create JSON spec for Kafka consumption
jsonSpec = KafkaTools.Consume.jsonSpec(colDefs, mapping, null)
// Consume the Kafka topic with JSON deserialization
personTable = KafkaTools.consumeToTable(
kafkaProps,
'test.topic',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_SEEK_TO_END,
KafkaTools.Consume.IGNORE,
jsonSpec,
KafkaTools.TableType.append()
)
The resulting personTable has the columns:
Ageas anint.Nameas aString.
From here, you can:
- Compute aggregates like average age.
- Join with other tables on
Name. - Filter or sort based on derived columns.
Alternative: Use an object processor spec
For more advanced custom parsing, you can use objectProcessorSpec with a JSON provider such as JacksonProvider to describe how to interpret Kafka values.
The Kafka streaming guide's JSON section shows a pattern like this:
import io.deephaven.json.jackson.JacksonProvider
import io.deephaven.json.ObjectValue
import io.deephaven.json.StringValue
import io.deephaven.json.DoubleValue
import io.deephaven.json.IntValue
import io.deephaven.kafka.KafkaTools
kafkaProps = new Properties()
kafkaProps.put('bootstrap.servers', 'redpanda:9092')
fields = ObjectValue.builder()
.putFields('symbol', StringValue.strict())
.putFields('price', DoubleValue.strict())
.putFields('qty', IntValue.strict())
.build()
provider = JacksonProvider.of(fields)
jacksonSpec = KafkaTools.Consume.objectProcessorSpec(provider)
ordersTable = KafkaTools.consumeToTable(
kafkaProps,
'orders',
KafkaTools.ALL_PARTITIONS,
KafkaTools.ALL_PARTITIONS_DONT_SEEK,
KafkaTools.Consume.IGNORE,
jacksonSpec,
KafkaTools.TableType.append()
)
By changing the fields description and the provider configuration, you can express sophisticated parsing logic while keeping your Groovy query code clean and declarative.
Tips for designing your custom parser
-
Validate input early.
- Check for missing fields, invalid types, or malformed payloads.
- Fail fast or route bad messages to a separate table when possible.
-
Keep your domain model stable.
- Map changing payloads into a stable
Personor similar class. - Add fields in backward-compatible ways when schemas evolve.
- Map changing payloads into a stable
-
Avoid heavy work inside parsing logic.
- Do not perform blocking network calls or expensive I/O while parsing.
- Keep parsing focused on decoding and basic validation.
-
Test with sample payloads.
- Produce test messages into Kafka using
docker compose exec redpanda rpk topic produce. - Verify that the resulting Deephaven table has the expected rows and types.
- Produce test messages into Kafka using