Skip to main content
Version: Python

Kafka Cheat Sheet

# Create a table
from deephaven import time_table

source = time_table("PT00:00:00.1").update(formulas=["X = i"])

# Send to Kafka, simple usage
from deephaven import kafka_producer as pk
from deephaven.stream.kafka.producer import KeyValueSpec

write_topic = pk.produce(source, {'bootstrap.servers': 'redpanda:29092'},\
'testTopic', pk.simple_spec('X'), KeyValueSpec.IGNORE)
# Create a table with random group number
# Create a table
from deephaven import time_table

source = time_table("PT00:00:00.1").update(formulas=["X = i"])

import random
source_group = time_table("PT00:00:00.1").update(formulas=["X = random.randint(1, 5)", "Y = i"])

# Send to Kafka, perform last_by on keys
from deephaven import kafka_producer as pk
from deephaven.stream.kafka.producer import KeyValueSpec

write_topic_group = pk.produce(source_group, {'bootstrap.servers': 'redpanda:29092'},\
'time-topic_group',pk.json_spec(['X']), pk.json_spec(['X','Y',]), True)
# Read from Kafka, simple usage
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht

result = ck.consume(
{
"bootstrap.servers": "redpanda:29092",
"deephaven.key.column.type": "String",
"deephaven.value.column.type": "String",
},
"testTopic",
)
# Read from Kafka, define key and value
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht
result = ck.consume({'bootstrap.servers' : 'redpanda:29092'},
'share.price',
partitions=ck.ALL_PARTITIONS,
offsets=ck.ALL_PARTITIONS_DONT_SEEK,
key_spec=ck.simple_spec('Symbol', dht.string),
value_spec=ck.simple_spec('Price', dht.double),
table_type=TableType.Append)
# Read from Kafka, ignores the partition and key values
from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht
result = ck.consume({ 'bootstrap.servers' : 'redpanda:29092',
'deephaven.partition.column.name' : None },
'share.price',
key_spec=KeyValueSpec.IGNORE,
value_spec=ck.simple_spec('Price', dht.double))

# Read from Kafka, JSON with mapping
result = ck.consume({ 'bootstrap.servers' : 'redpanda:29092' },
'orders',
key_spec=KeyValueSpec.IGNORE,
value_spec=ck.json_spec([('Symbol', dht.string),
('Side', dht.string),
('Price', dht.double),
('Qty', dht.int_) ],
mapping={ 'jsymbol' : 'Symbol',
'jside' : 'Side',
'jprice' : 'Price',
'jqty' : 'Qty' }),
table_type=TableType.append())

# Read from Kafka, AVRO
result = ck.consume({ 'bootstrap.servers' : 'redpanda:29092',
'schema.registry.url' :
'http://redpanda:8081' },
'share.price',
key_spec=KeyValueSpec.IGNORE,
value_spec=ck.avro_spec('share.price.record', schema_version='1'),
table_type=TableType.Append)