Connect to Deephaven Core+ Flight SQL
Flight SQL is a protocol on top of Arrow Flight that exposes SQL-like capabilities. The Deephaven Core+ worker provides a built-in Flight SQL server, making tables from the global query scope available as named tables. These tables can be accessed and queried through the default Flight SQL catalog using standard SQL queries.
Note
SELECT
queries are supported; INSERT
, UPDATE
, and DELETE
queries are not currently supported.
While Flight SQL may be an easy jumping-off point, the Flight SQL APIs are not Live Dataframe APIs. If you need to receive real-time updates, the best option is the Deephaven-native clients, such as the Deephaven Core+ Python client.
In Deephaven Community Core, you can connect directly to the Flight SQL server using most standard Flight SQL clients. For details, see the Flight SQL guide.
In Deephaven Core+, standard Flight SQL clients are not supported out of the box due to enhanced authentication and session management. However, you can still connect using a small amount of custom code, as described below.
Prerequisites
Before you begin, make sure you have:
- Access to a running Deephaven Core+ instance with Flight SQL enabled (Grizzly version 1.20240517.491 or later).
- Python 3.9 or newer installed.
You will also need to install the following packages in your Python environment:
Deephaven Core+ Python Client
Install the Deephaven Core+ Python client by following the instructions in the Develop a Python Client Query or the Deephaven Core+ Python client guide.
flightsql-dbapi
package
The flightsql-dbapi
package offers a PEP 249 compliant DB-API 2.0 interface for Flight SQL servers. This enables you to use popular Python database tools and libraries (such as pandas
, sqlalchemy
, or dbt
) to connect and query Deephaven using SQL.
To install, run:
pip install flightsql-dbapi
Connect to Deephaven Core+
To connect to a Deephaven Core+ server using Flight SQL, you must first establish a session with the Deephaven Core+ Python client. The following example demonstrates how to create a session using the user/password authentication:
from deephaven_enterprise.client.session_manager import SessionManager, DndSession
def start_new_worker(
host: str, port: int, user: str, password: str, query_server: Optional[str] = None
) -> DndSession:
connection_info = f"https://{host}:{port}/iris/connection.json"
session_manager = SessionManager(connection_info)
session_manager.password(user=user, password=password)
return session_manager.connect_to_new_worker(
name=None, heap_size_gb=1.0, server=query_server
)
Custom Flight SQL Client for Deephaven Core+
The flightsql-dbapi
package does not natively support Deephaven Core+ because of its advanced authentication and session management requirements.
To work around this, you can create a subclass of FlightSQLClient
that leverages an active Deephaven session for authentication and connectivity, as shown below.
from deephaven_enterprise.client.session_manager import SessionManager, DndSession
from flightsql import FlightSQLClient
class DHFlightSQLClient(FlightSQLClient):
def __init__(self, dh_session: DnDSession):
"""
Initialize a DHFlightSQLClient using an existing Deephaven session.
Args:
dh_session: An active Deephaven session object.
"""
# Initialize the base FlightSQLClient without connecting
super().__init__()
# Use the session's Flight client and authentication headers
self.client = dh_session._flight_client
self.headers = dh_session.grpc_metadata
A Complete Example: Fetch a table via the custom Flight SQL client
Below is a complete example that demonstrates how to:
- Start a new Deephaven Core+ worker session.
- Create and bind a ticking table.
- Query the table using the custom
DHFlightSQLClient
subclass created previously. - Fetch and print the results.
Replace "your-dh-host"
, "your-username"
, and "your-password"
with your actual Deephaven Core+ server details and credentials.
import logging
from typing import Optional
from deephaven_enterprise.client.session_manager import SessionManager, DndSession
from flightsql import FlightSQLClient
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class DHFlightSQLClient(FlightSQLClient):
def __init__(self, dh_session: DndSession):
"""
Initialize a DHFlightSQLClient using an existing Deephaven session.
Args:
dh_session (DndSession): The Deephaven session to use.
"""
# Initialize the base FlightSQLClient without connecting
super().__init__()
# Use the session's Flight client and authentication headers
self.client = dh_session._flight_client
self.headers = dh_session.grpc_metadata
def start_new_worker(
host: str, port: int, user: str, password: str, query_server: Optional[str] = None
) -> DndSession:
connection_info = f"https://{host}:{port}/iris/connection.json"
session_manager = SessionManager(connection_info)
session_manager.password(user=user, password=password)
return session_manager.connect_to_new_worker(
name=None, heap_size_gb=1.0, server=query_server
)
def main(
host: str, port: int, user: str, password: str, query_server: Optional[str] = None
):
logger.info("Starting a new Deephaven worker session...")
dh_session = start_new_worker(
host=host, port=port, user=user, password=password, query_server=query_server
)
logger.info("Creating and binding a ticking table...")
table = dh_session.time_table(period=1_000_000_000).update(["Value=1"])
dh_session.bind_table("Foo", table)
logger.info("Verifying the table is in the global query scope...")
sum_table = dh_session.open_table("Foo")
print(sum_table.to_arrow())
logger.info("Executing SQL query via Flight SQL...")
fsql_client = DHFlightSQLClient(dh_session=dh_session)
flight_info = fsql_client.execute("SELECT * FROM Foo")
logger.info("Fetching query results...")
ticket = flight_info.endpoints[0].ticket
reader = fsql_client.do_get(ticket)
print(reader.read_all())
logger.info("Closing session...")
fsql_client.close()
dh_session.close()
logger.info("Done!")
if __name__ == "__main__":
host = "your-dh-host"
port = 8000
user = "your-username"
password = "your-password"
main(host=host, port=port, user=user, password=password)