Connect to Deephaven Core+ Flight SQL

Flight SQL is a protocol on top of Arrow Flight that exposes SQL-like capabilities. The Deephaven Core+ worker provides a built-in Flight SQL server, making tables from the global query scope available as named tables. These tables can be accessed and queried through the default Flight SQL catalog using standard SQL queries.

Note

SELECT queries are supported; INSERT, UPDATE, and DELETE queries are not currently supported.

While Flight SQL may be an easy jumping-off point, the Flight SQL APIs are not Live Dataframe APIs. If you need to receive real-time updates, the best option is the Deephaven-native clients, such as the Deephaven Core+ Python client.

In Deephaven Community Core, you can connect directly to the Flight SQL server using most standard Flight SQL clients. For details, see the Flight SQL guide.

In Deephaven Core+, standard Flight SQL clients are not supported out of the box due to enhanced authentication and session management. However, you can still connect using a small amount of custom code, as described below.

Prerequisites

Before you begin, make sure you have:

  • Access to a running Deephaven Core+ instance with Flight SQL enabled (Grizzly version 1.20240517.491 or later).
  • Python 3.9 or newer installed.

You will also need to install the following packages in your Python environment:

Deephaven Core+ Python Client

Install the Deephaven Core+ Python client by following the instructions in the Develop a Python Client Query or the Deephaven Core+ Python client guide.

flightsql-dbapi package

The flightsql-dbapi package offers a PEP 249 compliant DB-API 2.0 interface for Flight SQL servers. This enables you to use popular Python database tools and libraries (such as pandas, sqlalchemy, or dbt) to connect and query Deephaven using SQL.

To install, run:

pip install flightsql-dbapi

Connect to Deephaven Core+

To connect to a Deephaven Core+ server using Flight SQL, you must first establish a session with the Deephaven Core+ Python client. The following example demonstrates how to create a session using the user/password authentication:

from deephaven_enterprise.client.session_manager import SessionManager, DndSession


def start_new_worker(
    host: str, port: int, user: str, password: str, query_server: Optional[str] = None
) -> DndSession:
    connection_info = f"https://{host}:{port}/iris/connection.json"
    session_manager = SessionManager(connection_info)
    session_manager.password(user=user, password=password)
    return session_manager.connect_to_new_worker(
        name=None, heap_size_gb=1.0, server=query_server
    )

Custom Flight SQL Client for Deephaven Core+

The flightsql-dbapi package does not natively support Deephaven Core+ because of its advanced authentication and session management requirements.
To work around this, you can create a subclass of FlightSQLClient that leverages an active Deephaven session for authentication and connectivity, as shown below.

from deephaven_enterprise.client.session_manager import SessionManager, DndSession

from flightsql import FlightSQLClient


class DHFlightSQLClient(FlightSQLClient):
    def __init__(self, dh_session: DnDSession):
        """
        Initialize a DHFlightSQLClient using an existing Deephaven session.

        Args:
            dh_session: An active Deephaven session object.
        """
        # Initialize the base FlightSQLClient without connecting
        super().__init__()

        # Use the session's Flight client and authentication headers
        self.client = dh_session._flight_client
        self.headers = dh_session.grpc_metadata

A Complete Example: Fetch a table via the custom Flight SQL client

Below is a complete example that demonstrates how to:

  1. Start a new Deephaven Core+ worker session.
  2. Create and bind a ticking table.
  3. Query the table using the custom DHFlightSQLClient subclass created previously.
  4. Fetch and print the results.

Replace "your-dh-host", "your-username", and "your-password" with your actual Deephaven Core+ server details and credentials.

import logging
from typing import Optional

from deephaven_enterprise.client.session_manager import SessionManager, DndSession
from flightsql import FlightSQLClient

# Set up logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


class DHFlightSQLClient(FlightSQLClient):
    def __init__(self, dh_session: DndSession):
        """
        Initialize a DHFlightSQLClient using an existing Deephaven session.

        Args:
            dh_session (DndSession): The Deephaven session to use.
        """
        # Initialize the base FlightSQLClient without connecting
        super().__init__()

        # Use the session's Flight client and authentication headers
        self.client = dh_session._flight_client
        self.headers = dh_session.grpc_metadata


def start_new_worker(
    host: str, port: int, user: str, password: str, query_server: Optional[str] = None
) -> DndSession:
    connection_info = f"https://{host}:{port}/iris/connection.json"
    session_manager = SessionManager(connection_info)
    session_manager.password(user=user, password=password)
    return session_manager.connect_to_new_worker(
        name=None, heap_size_gb=1.0, server=query_server
    )


def main(
    host: str, port: int, user: str, password: str, query_server: Optional[str] = None
):
    logger.info("Starting a new Deephaven worker session...")
    dh_session = start_new_worker(
        host=host, port=port, user=user, password=password, query_server=query_server
    )

    logger.info("Creating and binding a ticking table...")
    table = dh_session.time_table(period=1_000_000_000).update(["Value=1"])
    dh_session.bind_table("Foo", table)

    logger.info("Verifying the table is in the global query scope...")
    sum_table = dh_session.open_table("Foo")
    print(sum_table.to_arrow())

    logger.info("Executing SQL query via Flight SQL...")
    fsql_client = DHFlightSQLClient(dh_session=dh_session)
    flight_info = fsql_client.execute("SELECT * FROM Foo")

    logger.info("Fetching query results...")
    ticket = flight_info.endpoints[0].ticket
    reader = fsql_client.do_get(ticket)
    print(reader.read_all())

    logger.info("Closing session...")
    fsql_client.close()
    dh_session.close()
    logger.info("Done!")


if __name__ == "__main__":
    host = "your-dh-host"
    port = 8000
    user = "your-username"
    password = "your-password"
    main(host=host, port=port, user=user, password=password)