Can I integrate custom data sources with Deephaven?

Yes, you can integrate custom data sources with Deephaven. While Deephaven includes a proprietary columnar store for persistent historical and intraday data, you can integrate your own data stores to leverage Deephaven's efficient engine, analytics, and data visualization capabilities.

There are three main integration approaches:

  • Static in-memory tables - Similar to CSV and JDBC imports.
  • Dynamic in-memory tables - For real-time data feeds like multicast distribution systems.
  • Lazily-loaded on-disk tables - For large datasets like Apache Parquet files.

Understanding Deephaven table structure

Each Deephaven table consists of:

  • A RowSet - An ordered set of long keys representing valid row addresses.
  • Named ColumnSources - A map from column name to ColumnSource, which acts as a dictionary from row key to cell value.

In Python, you typically use higher-level APIs like new_table(), DynamicTableWriter, or pandas integration rather than working directly with RowSets and ColumnSources.

Static in-memory tables

For simple static data sources, use new_table() to create tables from Python data structures.

Here's an example of creating a static table from custom data:

from deephaven import new_table
from deephaven.column import string_col, double_col, int_col

# Your custom data as Python lists
symbols = ["AAPL", "GOOGL", "MSFT", "AMZN"]
prices = [150.25, 2800.50, 380.75, 3400.00]
volumes = [1000000, 500000, 750000, 600000]

# Create the table
custom_table = new_table(
    [
        string_col("Symbol", symbols),
        double_col("Price", prices),
        int_col("Volume", volumes),
    ]
)

The new_table() function automatically infers column types from the provided data.

Alternative: Using pandas

You can also create tables from pandas DataFrames:

import pandas as pd
from deephaven import pandas as dhpd

# Create a pandas DataFrame
df = pd.DataFrame(
    {
        "Symbol": ["AAPL", "GOOGL", "MSFT", "AMZN"],
        "Price": [150.25, 2800.50, 380.75, 3400.00],
        "Volume": [1000000, 500000, 750000, 600000],
    }
)

# Convert to Deephaven table
custom_table = dhpd.to_table(df)

Dynamic in-memory tables

Dynamic tables allow you to integrate real-time data feeds. These tables update on each Deephaven update cycle and notify downstream operations of changes.

The easiest way to create dynamic tables in Python is using DynamicTableWriter:

from deephaven import DynamicTableWriter
import deephaven.dtypes as dht

# Define the table schema
column_definitions = {"Symbol": dht.string, "Price": dht.double}

# Create a dynamic table writer
table_writer = DynamicTableWriter(column_definitions)
dynamic_table = table_writer.table

# Write initial data
for i in range(100):
    table_writer.write_row(f"SYM{i}", 100.0 + i)

# In your update logic, continue writing rows:
# table_writer.write_row("AAPL", 150.25)

Using table replayer for time-based data

For replaying historical data or simulating real-time feeds:

from deephaven import new_table
from deephaven.column import string_col, double_col, datetime_col
from deephaven.replay import TableReplayer
from deephaven.time import to_j_instant

# Create source data with timestamps
source_data = new_table(
    [
        string_col("Symbol", ["AAPL", "GOOGL", "MSFT"]),
        double_col("Price", [150.25, 2800.50, 380.75]),
        datetime_col(
            "Timestamp",
            [
                to_j_instant("2024-01-01T09:30:00 ET"),
                to_j_instant("2024-01-01T09:30:01 ET"),
                to_j_instant("2024-01-01T09:30:02 ET"),
            ],
        ),
    ]
)

# Create a replayer that replays data based on timestamps
replayer = TableReplayer(
    to_j_instant("2024-01-01T09:30:00 ET"), to_j_instant("2024-01-01T09:35:00 ET")
)
replayed_table = replayer.add_table(source_data, "Timestamp")
replayer.start()

Advanced: Java interop for custom ColumnSources

For advanced use cases requiring custom data loading logic, you can use Java interop to create custom ColumnSources. This approach is similar to the Groovy examples but uses Python's Java integration.

Warning

This is an advanced technique requiring knowledge of Deephaven's Java internals. For most use cases, new_table(), DynamicTableWriter, or pandas integration are recommended.

from deephaven.jcompat import j_hashmap
import jpy

# Import Java classes
RowSetFactory = jpy.get_type("io.deephaven.engine.rowset.RowSetFactory")
ArrayBackedColumnSource = jpy.get_type(
    "io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource"
)
QueryTable = jpy.get_type("io.deephaven.engine.table.impl.QueryTable")
String = jpy.get_type("java.lang.String")

# Your custom data as Java arrays
symbols = jpy.array("java.lang.String", ["AAPL", "GOOGL", "MSFT", "AMZN"])
prices = jpy.array("double", [150.25, 2800.50, 380.75, 3400.00])
volumes = jpy.array("int", [1000000, 500000, 750000, 600000])

# Create a TrackingRowSet
row_set = RowSetFactory.flat(len(symbols)).toTracking()

# Create column sources
# Note: Primitive arrays (int, double) infer type automatically,
# but object arrays (String) require explicit type specification
column_sources = j_hashmap(
    {
        "Symbol": ArrayBackedColumnSource.getMemoryColumnSource(
            symbols, String.jclass, None
        ),
        "Price": ArrayBackedColumnSource.getMemoryColumnSource(prices),
        "Volume": ArrayBackedColumnSource.getMemoryColumnSource(volumes),
    }
)

# Create the QueryTable
custom_table = QueryTable(row_set, column_sources)

Working with external data sources

Reading from custom file formats

For custom file formats, read the data into Python structures and use new_table():

import struct
from deephaven import new_table
from deephaven.column import double_col


def read_custom_binary_file(filepath):
    """Example: Read doubles from a custom binary format"""
    values = []
    with open(filepath, "rb") as f:
        while True:
            chunk = f.read(8)  # Read 8 bytes for a double
            if not chunk:
                break
            values.append(struct.unpack("d", chunk)[0])
    return values


# Read data and create table
data = read_custom_binary_file("data.bin")
custom_table = new_table([double_col("Value", data)])

Streaming data from APIs

For streaming data from external APIs, use DynamicTableWriter with a background thread:

import threading
import time
from deephaven import DynamicTableWriter
import deephaven.dtypes as dht

# Create dynamic table
column_definitions = {
    "Timestamp": dht.Instant,
    "Symbol": dht.string,
    "Price": dht.double,
}

table_writer = DynamicTableWriter(column_definitions)
streaming_table = table_writer.table


def fetch_and_write_data():
    """Background thread that fetches data from an API"""
    while True:
        # Fetch data from your API
        # data = fetch_from_api()

        # Write to table
        # table_writer.write_row(data['timestamp'], data['symbol'], data['price'])

        time.sleep(1)  # Poll interval


# Start background thread
thread = threading.Thread(target=fetch_and_write_data, daemon=True)
thread.start()

Note

These FAQ pages contain answers to questions about Deephaven Community Core that our users have asked in our Community Slack. If you have a question that is not in our documentation, join our Community and we'll be happy to help!