Skip to main content

Kafka Cheat Sheet

# Create a table
from deephaven.TableTools import timeTable

source = timeTable('00:00:00.1').update("X = i")

# Send to Kafka, simple usage
from deephaven import ProduceKafka as pk

write_topic = pk.produceFromTable(source, {'bootstrap.servers': 'redpanda:29092'}, 'testTopic', pk.simple('X'), pk.IGNORE)


# Create a table with random group number
import random
source_group = timeTable('00:00:00.1').update("X = random.randint(1, 5)", "Y = i")

# Send to Kafka, perform lastBy on keys
write_topic_group = pk.produceFromTable(source_group, {'bootstrap.servers': 'redpanda:29092'},\
'time-topic_group',pk.json(['X']), pk.json(['X','Y',]), True)


# Read from Kafka, simple usage
from deephaven import ConsumeKafka as ck
from deephaven import Types as dht

result = ck.consumeToTable({'bootstrap.servers': 'redpanda:29092'}, 'testTopic')

# Read from Kafka, define key and value
result = ck.consumeToTable({'bootstrap.servers' : 'redpanda:29092'},
'share.price',
partitions=ck.ALL_PARTITIONS,
offsets=ck.ALL_PARTITIONS_DONT_SEEK,
key=ck.simple('Symbol', dht.string),
value=ck.simple('Price', dht.double),
table_type='append')

# Read from Kafka, ignores the partition and key values
result = ck.consumeToTable({ 'bootstrap.servers' : 'redpanda:29092',
'deephaven.partition.column.name' : None },
'share.price',
key=ck.IGNORE,
value=ck.simple('Price', dht.double))

# Read from Kafka, JSON with mapping
result = ck.consumeToTable({ 'bootstrap.servers' : 'redpanda:29092' },
'orders',
key=ck.IGNORE,
value=ck.json([ ('Symbol', dht.string),
('Side', dht.string),
('Price', dht.double),
('Qty', dht.int_) ],
mapping={ 'jsymbol' : 'Symbol',
'jside' : 'Side',
'jprice' : 'Price',
'jqty' : 'Qty' }),
table_type='append')

# Read from Kafka, AVRO
result = ck.consumeToTable({ 'bootstrap.servers' : 'redpanda:29092',
'schema.registry.url' :
'http://redpanda:8081' },
'share.price',
key=ck.IGNORE,
value=ck.avro('share.price.record', schema_version='1'),
table_type='append')