Import and Export Data
Data I/O is mission-critical for any real-time data analysis platform. Deephaven supports a wide variety of data sources and formats, including CSV, Parquet, Kafka, and more. This document covers those formats in Deephaven.
CSV
Deephaven can read CSV files that exist locally or remotely. This example reads a local CSV file.
from deephaven import read_csv
iris = read_csv("/data/examples/Iris/csv/iris.csv")
- iris
It can also write data to CSV. The code below writes that same table back to a CSV file.
from deephaven import write_csv
write_csv(iris, "/data/iris_new.csv")
Just to show that it's there:
iris_new = read_csv("/data/iris_new.csv")
- iris_new
Parquet
Apache Parquet is a columnar storage format that supports compression to store more data in less space. Deephaven supports reading and writing single, nested, and partitioned Parquet files. Parquet data can be stored locally or in S3. The example below reads from a local Parquet file.
from deephaven import parquet as dhpq
crypto_trades = dhpq.read(
"/data/examples/CryptoCurrencyHistory/Parquet/CryptoTrades_20210922.parquet"
)
- crypto_trades
That same table can be written back to a Parquet file:
dhpq.write(crypto_trades, "/data/crypto_trades_new.parquet")
Just to show that it worked:
crypto_trades_new = dhpq.read("/data/crypto_trades_new.parquet")
- crypto_trades_new
Kafka
Apache Kafka is a distributed event streaming platform that can be used to publish and subscribe to streams of records. Deephaven can consume and publish to Kafka streams. The code below consumes a stream.
from deephaven.stream.kafka import consumer as kc
from deephaven import dtypes as dht
result_append = kc.consume(
{"bootstrap.servers": "redpanda:9092"},
"test.topic",
table_type=kc.TableType.append(),
key_spec=kc.KeyValueSpec.IGNORE,
value_spec=kc.simple_spec("Command", dht.string),
)
- result_append
Similarly, this code publishes the data in a Deephaven table to a Kafka stream.
from deephaven.stream.kafka import producer as kp
from deephaven import time_table
# create ticking table to publish to Kafka stream
source = time_table("PT1s").update("X = i")
# publish to time-topic
write_topic = kp.produce(
source,
{"bootstrap.servers": "redpanda:9092"},
"time-topic",
key_spec=kp.KeyValueSpec.IGNORE,
value_spec=kp.simple_spec("X"),
)
- source
Function generated tables
Function generated tables are tables populated by a Python function. The function is reevaluated when source tables change or at a regular interval. The following example re-generates data in a table once per second.
from deephaven import empty_table, function_generated_table
def regenerate():
return empty_table(10).update(
[
"Group = randomInt(1, 4)",
"GroupMean = Group == 1 ? -10.0 : Group == 2 ? 0.0 : Group == 3 ? 10.0 : NULL_DOUBLE",
"GroupStd = Group == 1 ? 2.5 : Group == 2 ? 0.5 : Group == 3 ? 1.0 : NULL_DOUBLE",
"X = randomGaussian(GroupMean, GroupStd)",
]
)
fgt = function_generated_table(table_generator=regenerate, refresh_interval_ms=1000)
- fgt
Function generated tables, on their own, don't do any data I/O. However, Python functions evaluated at a regular interval to create a ticking table are a powerful tool for data ingestion from external sources like WebSockets, databases, and much more. Check out this blog post that uses WebSockets to stream data into Deephaven with function generated tables.