Skip to main content

How to connect to a Kafka stream

Kafka is a distributed event streaming platform that lets you read, write, store, and process events, also called records.

Kafka topics take on many forms, such as raw input, JSON, or AVRO. In this guide, we show you how to import each of these formats as Deephaven tables.

Please see our concept guide, Kafka basic terminology, for a detailed discussion of Kafka topics and supported formats. See the Apache Kafka Documenation for full details on how to use Kafka.

Kafka in Deephaven

The following options are allowed, and defaults are provided as noted.

Standard data fields

Kafka has the standard data fields of partition, offset, and timestamp. Each of these fields becomes a column in the new table that stores the Kafka stream. The column names can be changed, but the type of column is set. The standard names and types for these values are:

  • KafkaPartition: int
  • KafkaOffset: long
  • KafkaTimestamp: date-time

When reading a Kafka topic, you can select which partitions to listen to. By default, all partitions are read. Additionally, topics can be read from the beginning, from the latest offset, or from the first unprocessed offset. By default, all partitions are read from the latest offset.

note

The Kafka infrastructure can retain old messages for a maximum given age or retain the last message for individual keys.

While these three field are traditionally included in the new table, you can choose to ignore them, such as when there is only partition.

Key and value

The actual data of the Kafka stream are the KafkaKey and KafkaValue. This is the information that is logged onto the partition with an offset at a certain time. For example, a list of messages might have a key of the user and a value of the message, and is logged at a certain time.

KafkaKey and KafkaValue are similar in that they can be nearly any sequence of bytes. The primary difference is that the key is used to create a hash that will facilitate load balancing. By default, each key and value is assigned a simple format, with column names of either KafkaKey or KafkaValue, and String type.

The KafkaKey and KafkaValue attributes can be:

  • simple type
  • JSON encoded
  • Avro encoded
  • ignored (cannot ignore both key and value)

Choose table type

Deephaven tables can be append-only or streaming:

  • Append-only tables add one row for every message ingested - thus, table size and memory consumption can grow rapidly. Set this value with table_type = 'append'.
  • Streaming tables only keep the set of rows received during the last update cycle. This forms the basis for more advanced use cases when used in combination with table operations like lastBy. For streaming tables without any other post-processing, new messages will appear in the table for an LTM cycle and then disappear. A streaming table is the default. You can set this value to table_type = 'stream' to be explicit, but this is not required.

Start Kafka with Deephaven

For this example, we use a Docker image with redpanda along with our traditional Deephaven image. Redpanda allows us to input data directly into a Kafka stream from the terminal. This is just one of the supported Kafka-compatible event streaming platforms. Many more are available.

For more information about downloading Deephaven in pre-built Docker images with 'redpanda', see our Quick start.

First, create a directory for the deployment to live in. Use any directory name you like; we chose deephaven-redpanda:

mkdir deephaven-redpanda

Then, make that the current working directory:

cd deephaven-redpanda

Now, use curl to get the Docker Compose file for this configuration. We use a Deephaven Python deployment with 'redpanda' and the examples manager included:

compose_file=https://raw.githubusercontent.com/deephaven/deephaven-core/main/containers/python-examples-redpanda/docker-compose.yml
curl -O "${compose_file}"

docker-compose pull
caution

When new features are added to Deephaven, you will need to redownload the docker-compose.yml file to get the latest version of Deephaven.

Bring up the deployment:

docker-compose up -d

This will start Deephaven with redpanda.

Import a Kafka stream via terminal

In this example, we consume a Kafka topic (test.topic) as a Deephaven table. The Kafka topic is populated by commands entered into the terminal.

from deephaven import ConsumeKafka as ck

result = ck.consumeToTable({'bootstrap.servers': 'redpanda:29092'}, 'test.topic')

img

In this example, consumeToTable creates a Deephaven table from a Kafka topic. Here, {'bootstrap.servers': 'redpanda:29092'} is a dictionary describing how the Kafka infrastructure is configured. bootstrap.servers provides the initial hosts that a Kafka client uses to connect. In this case, bootstrap.servers is set to redpanda:29092.

The result table is now subscribed to all partitions in the test.topic topic. When data is sent to the test.topic topic, it will appear in the table.

Input Kafka data

Information is entered into the Kafka topic via a terminal. To do this, run:

docker-compose exec redpanda rpk  topic produce test.topic

Let's walk through this command step-by-step.

  • docker-compose exec : This runs a new command in a running container.
  • redpanda: The name for the service in which we are executing commands.
  • rpk topic produce test.topic: This produces a record in the test.topic topic from data entered in stdin. This command is built into redpanda. For more commands, see rpk documentation

This will wait for input from the terminal and will send any input to the test.topic topic. Enter the information and use the keyboard shortcut Ctrl + D to send.

Once sent, that information will automatically appear in your Deephaven table.

Import a Kafka stream with append

In this example, consumeToTable reads the Kafka topic share.price with additional settings are enabled. The specific key and value result in a table that appends new rows.

note

This and the following examples will not run natively unless you use a 'redpanda' configuration. These are intended to show possible use cases for Kafka with Deephaven.

from deephaven import ConsumeKafka as ck

result = ck.consumeToTable({'bootstrap.servers' : 'redpanda:29092'},
'share.price',
partitions=ck.ALL_PARTITIONS,
offsets=ck.ALL_PARTITIONS_DONT_SEEK,
key=ck.simple('Symbol', 'string'),
value=ck.simple('Price', 'double'),
table_type='append')

img

Let's walk through this query, focusing on the new optional arguments we've set.

  • partitions is set to ALL_PARTITIONS, which specifies that we want to listen to all partitions.
  • offsets is set to ALL_PARTITIONS_DONT_SEEK, which only listens to new messages produced after this call is processed.
  • key is set to simple('Symbol', 'string'), which expects messages with a Kafka key field of type string, and creates a Symbol column to store the information.
  • value is set to simple('Price', 'double'), which expects messages with a Kafka value field of type double, and creates a Price column to store the information.
  • table_type is set to append, which creates an append-only table.

Import a Kafka stream ignoring keys

In this example, consumeToTable reads the Kafka topic share.price and ignores the partition and key values.

from deephaven import ConsumeKafka as ck

result = ck.consumeToTable({ 'bootstrap.servers' : 'redpanda:29092',
'deephaven.partition.column.name' : None },
'share.price',
key=ck.IGNORE,
value=ck.simple('Price', 'double'))

deephaven.partition.column.name allows the partition column to be renamed. Because deephaven.partition.column.name is set to None, the partition field is ignored. The result table will not include a partition column.

Additionally, the key field is set to IGNORE. The result table will not include a key column.

img

Perform multiple operations

In this example, consumeToTable reads the Kafka topic quotes and the Kafka topic orders into streaming Deephaven tables. The key and value columns are renamed using deephaven.key.column.name and deephaven.value.column.name. Table operations, such as lastBy and AggSum, are then performed.

from deephaven import ConsumeKafka as ck

simplePriceTable = ck.consumeToTable(
{ 'bootstrap.servers' : 'redpanda:29092',
'deephaven.key.column.name' : 'Symbol',
'deephaven.value.column.name' : 'Price',
'deephaven.value.column.type' : 'double' },
'quotes',
table_type='stream')

lastPrice = simplePriceTable.lastBy("Symbol")

ordersStream = ck.consumeToTable(
{ 'bootstrap.servers' : 'redpanda:29092' },
'orders',
value=json([ ('Symbol', 'string'),
('Side', 'string'),
('LimitPrice', 'double'),
('Qty', 'int') ]),
table_type='stream')

ordersWithCurrentPrice = ordersStream.naturalJoin(lastPrice, "Symbol", "LastPrice=Price")

from deephaven import ComboAggregateFactory as caf

totalNotional = ordersWithCurrentPrice.by(
caf.AggCombo(caf.AggSum("Shares=Qty"),
caf.AggWSum("Qty", "Notional=LastPrice")),
"Symbol")

img

Read Kafka topic in JSON format

In this example, consumeToTable reads the Kafka topic share.price in JSON format.

from deephaven import ConsumeKafka as ck

result = ck.consumeToTable({ 'bootstrap.servers' : 'redpanda:29092' },
'orders',
key=ck.IGNORE,
value=ck.json([ ('Symbol', 'string'),
('Side', 'string'),
('Price', 'double'),
('Qty', 'int') ],
mapping={ 'jsymbol' : 'Symbol',
'jside' : 'Side',
'jprice' : 'Price',
'jqty' : 'Qty' }),
table_type='append')

In this query, the value argument uses json. A JSON parameterization is used for the KafkaValue field.

After this, we see an ordered list of Python tuples specifying column definitions.

  • The first element in each tuple is a string for the column name in the result table.
  • The second element in each tuple is a string for the column data type in the result table.

Within the value argument, the keyword argument of mapping is given. This is a Python dictionary specifying a mapping from JSON field names to resulting table column names. Column names should be in the list provided in the first argument described above. The mapping dictionary may contain fewer entries than the total number of columns defined in the first argument.

In the example, the map entry 'jprice' : 'Price' specifies the incoming messages are expected to contain a JSON field named jprice, whose value will be mapped to the Price column in the resulting table. The columns not mentioned are mapped from matching JSON fields.

If the mapping keyword argument is not provided, it is assumed that JSON field names and column names will match.

img

Read Kafka topic in Avro format

In this example, consumeToTable reads the Kafka topic share.price in Avro format. This example uses an external schema definition registered in the development testing Redpanda instance that can be seen below. A Kafka Schema Registry allows sharing and versioning of Kafka event schema definitions.

from deephaven import ConsumeKafka as ck

result = ck.consumeToTable({ 'bootstrap.servers' : 'redpanda:29092',
'schema.registry.url' :
'http://redpanda:8081' },
'share.price',
key=ck.IGNORE,
value=ck.avro('share.price.record', schema_version='1'),
table_type='append')

In this query, the first argument includes an additional entry for schema.registry.url to specify the URL for a schema registry with a REST API compatible with Confluent's schema registry specification.

The value argument uses avro, which specifies an Avro format for the Kafka value field.

The first positional argument in the avro call specifies the Avro schema to use. In this case, avro gets the schema named share.price.record from the schema registry. Alternatively, the first argument can be an org.apache.avro.Schema object obtained from getAvroSchema.

Three optional keyword arguments are supported:

  • schema_version specifies the version of the schema to get, for the given name, from the schema registry. If not specified, the default of latest is assumed. This will retrieve the latest available schema version.
  • mapping expects a dictionary value, and if provided, specifies a name mapping for Avro field names to table column names. Any Avro field name not mentioned is mapped to a column of the same name.
  • mapping_only expects a dictionary value, and if provided, specifies a name mapping for Avro field names to table column names. Any Avro field name not mentioned is omitted from the resulting table.
  • When mapping and mapping_only are both omitted, all Avro schema fields are mapped to columns using the field name as column name.

img img

Write to a Kafka stream

We can take any table and write that table to a Kafka stream in a similar fashion to how we read the data into Deephaven.

In this example, we write a simple time table to a topic called time-topic. With only one data point, we use the X as a key and ignore the value.


from deephaven.TableTools import timeTable
from deephaven import ProduceKafka as pk

source = timeTable('00:00:00.1').update("X = i")

write_topic = pk.produceFromTable(source, {'bootstrap.servers': 'redpanda:29092'}, 'time-topic', pk.simple('X'), pk.IGNORE)

In this example, we write a time table to a topic called time-topic_group. The last argument is True for last_by_key_columns, which indicates we want to perform a lastBy on the keys before writing to the stream.

import random
source_group = timeTable('00:00:00.1').update("X = random.randint(1, 5)", "Y = i")

write_topic_group = pk.produceFromTable(source_group, {'bootstrap.servers': 'redpanda:29092'},\
'time-topic_group',pk.json(['X']), pk.json(['X','Y',]), True)