Skip to main content
Version: Python

Import and Export Data

Data I/O is mission-critical for any real-time data analysis platform. Deephaven supports a wide variety of data sources and formats, including CSV, Parquet, Kafka, and more. This document covers those formats in Deephaven.

CSV

Deephaven can read CSV files that exist locally or remotely. This example reads a local CSV file.

from deephaven import read_csv

iris = read_csv("/data/examples/Iris/csv/iris.csv")

It can also write data to CSV. The code below writes that same table back to a CSV file.

from deephaven import write_csv

write_csv(iris, "/data/iris_new.csv")

Just to show that it's there:

iris_new = read_csv("/data/iris_new.csv")

Parquet

Apache Parquet is a columnar storage format that supports compression to store more data in less space. Deephaven supports reading and writing single, nested, and partitioned Parquet files. Parquet data can be stored locally or in S3. The example below reads from a local Parquet file.

from deephaven import parquet as dhpq

crypto_trades = dhpq.read(
"/data/examples/CryptoCurrencyHistory/Parquet/CryptoTrades_20210922.parquet"
)

That same table can be written back to a Parquet file:

dhpq.write(crypto_trades, "/data/crypto_trades_new.parquet")

Just to show that it worked:

crypto_trades_new = dhpq.read("/data/crypto_trades_new.parquet")

Kafka

Apache Kafka is a distributed event streaming platform that can be used to publish and subscribe to streams of records. Deephaven can consume and publish to Kafka streams. The code below consumes a stream.

from deephaven.stream.kafka import consumer as kc
from deephaven import dtypes as dht

result_append = kc.consume(
{"bootstrap.servers": "redpanda:9092"},
"test.topic",
table_type=kc.TableType.append(),
key_spec=kc.KeyValueSpec.IGNORE,
value_spec=kc.simple_spec("Command", dht.string),
)

Similarly, this code publishes the data in a Deephaven table to a Kafka stream.

from deephaven.stream.kafka import producer as kp
from deephaven import time_table

# create ticking table to publish to Kafka stream
source = time_table("PT1s").update("X = i")

# publish to time-topic
write_topic = kp.produce(
source,
{"bootstrap.servers": "redpanda:9092"},
"time-topic",
key_spec=kp.KeyValueSpec.IGNORE,
value_spec=kp.simple_spec("X"),
)

Function generated tables

Function generated tables are tables populated by a Python function. The function is reevaluated when source tables change or at a regular interval. The following example re-generates data in a table once per second.

from deephaven import empty_table, function_generated_table


def regenerate():
return empty_table(10).update(
[
"Group = randomInt(1, 4)",
"GroupMean = Group == 1 ? -10.0 : Group == 2 ? 0.0 : Group == 3 ? 10.0 : NULL_DOUBLE",
"GroupStd = Group == 1 ? 2.5 : Group == 2 ? 0.5 : Group == 3 ? 1.0 : NULL_DOUBLE",
"X = randomGaussian(GroupMean, GroupStd)",
]
)


fgt = function_generated_table(table_generator=regenerate, refresh_interval_ms=1000)

Function generated tables, on their own, don't do any data I/O. However, Python functions evaluated at a regular interval to create a ticking table are a powerful tool for data ingestion from external sources like WebSockets, databases, and much more. Check out this blog post that uses WebSockets to stream data into Deephaven with function generated tables.