Core+ Java Fight SQL Client

Flight SQL is a protocol on top of Arrow Flight that exposes SQL-like capabilities. Deephaven Core+ workers provide a built-in Flight SQL server that makes tables in the global query scope available as named tables. These tables can be accessed and queried through the default Flight SQL catalog using standard SQL queries.

Note

SELECT queries are supported; INSERT, UPDATE, and DELETE queries are not currently supported.

While Flight SQL may be an easy jumping-off point, the Flight SQL APIs are not Live Dataframe APIs. If you need to receive real-time updates, the best option is the Deephaven-native clients, such as the Deephaven Core+ Java client.

In Deephaven Community Core, you can connect directly to the Flight SQL server using most standard Flight SQL clients. For details, see the Community Flight SQL guide.

Note

In Deephaven Core+, standard Flight SQL clients do not work out of the box because of enhanced authentication and session management requirements. You can still connect by implementing a small amount of custom code, as described below.

The source code for this guide is available in a ZIP archive.

Prerequisites

This guide assumes:

  1. You are using a Linux-based operating system. While the scripts and Java code used should also work on Windows, command syntax and environment setup may differ slightly between platforms.

  2. You are familiar with Java and Gradle.

You must have the following to use the Java Flight SQL client:

  • Access to a running Deephaven Core+ instance with Flight SQL enabled (Grizzly version 1.20240517.491 or later).
  • JDK 17 or newer installed.
  • Gradle 7.3 or newer installed.

Set up a Gradle project

To get started, create a new Gradle Java project:

mkdir deephaven-flightsql-client
cd deephaven-flightsql-client
gradle init \
  --type java-application \
  --dsl groovy \
  --project-name deephaven-flightsql-client \
  --package io.deephaven.example \
  --java-version 17 \
  --test-framework junit \
  --no-split-project

This will generate a basic Java application structure using Gradle, with the package set to io.deephaven.example and Java 17 as the language version.

Configure dependencies and authentication

To use the Core+ Java client, you need access to Deephaven's private Artifactory repository. Obtain your Artifactory username and API key from your administrator. Then, add the following lines to your gradle.properties file (typically located at ${HOME}/.gradle/gradle.properties):

artifactoryUser=<your username>
artifactoryAPIKey=<your Artifactory key>

Next, replace the contents of your project's build.gradle file with the following configuration:

plugins {
  id 'application'
}

def dhcVersion = "0.39.5"
def dheVersion = "1.20240517.501"
def arrowVersion = "18.0.0"

repositories {
  mavenCentral()
  maven {
    credentials {
      username = artifactoryUser ?: System.getProperty('user.name')
      password = artifactoryAPIKey ?: ''
    }
    url "https://illumon.jfrog.io/illumon/libs-customer"
  }
}

dependencies {
  runtimeOnly "io.deephaven:deephaven-log-to-slf4j:${dhcVersion}"
  runtimeOnly 'ch.qos.logback:logback-classic:1.4.5'

  implementation "iris:client-flight:$dheVersion"
  implementation "iris:client-base:$dheVersion"
  implementation "iris:AuthLib:$dheVersion"
  implementation "iris:ControllerClientGrpc:$dheVersion"

  implementation "org.apache.arrow:flight-sql:${arrowVersion}"
}

java {
  toolchain {
    languageVersion = JavaLanguageVersion.of(17)
  }
}

application {
  mainClass = 'io.deephaven.example.FlightSqlExample'
  applicationDefaultJvmArgs = [
    "-DDeephavenEnterprise.rootFile=iris-common.prop",
    "--add-opens=java.base/java.nio=ALL-UNNAMED"
  ]
}

Note

Update the dhcVersion and dheVersion variables to match the versions of Deephaven you are connecting to.

Persistent Query setup

This example demonstrates how to connect to a Persistent Query to retrieve data. First, log in to your Deephaven server's web UI, then use the Query Monitor to create a Persistent Query.

  1. On the Settings tab:
  • Name the PQ FlightSqlTestJavaClientPQ.
  • Select Live Query (Script) for the type.
  • Select a DB Server of AutoQuery.
  • Use the Core+ engine.
  • Give it a heap size of 1 GB.
  1. On the Scheduling tab, select Disabled.

  2. On the Script tab, select a Runtime of Python. The script consists of one line to retrieve the ProcessEventLog table and view the LogEntry column in the order it was written. This table contains events written out by Deephaven workers and can help diagnose issues.

pel = (
    db.live_table("DbInternal", "ProcessEventLog")
    .where("Date=today()")
    .sort("Timestamp")
    .view(["Timestamp", "LogEntry"])
)
  1. Save the PQ.

Flight SQL example

Below is a complete example Java program, FlightSqlExample.java, that demonstrates connecting to a Deephaven Core+ server using the Flight SQL protocol and retrieving data from a table via SQL query.

This example covers:

  • Creating a Deephaven session and authenticating with the server.
  • Ensuring the specified PQ is running.
  • Listing available tables in the system catalog.
  • Leveraging the FlightClient from your session to create a FlightSqlClient.
  • Using the FlightSqlClient to execute SQL queries against Deephaven tables, retrieve results, and print them.

To use this example:

  1. Update the placeholders in the code (your-dh-host, your-username, your-password) with your actual Deephaven server details and credentials.
  2. Ensure the PQ name (FlightSqlTestJavaClientPQ) and table name (pel) match those created in the previous steps.
  3. Place the code in src/main/java/io/deephaven/example/FlightSqlExample.java within your Gradle project.
  4. Build and run the project using Gradle from the project's root directory.
./gradlew run

Expand the section below to view the full source code.

FlightSqlExample.java

package io.deephaven.example;

import io.deephaven.enterprise.auth.UserContext;
import io.deephaven.enterprise.config.HttpUrlPropertyInputStreamLoader;
import io.deephaven.enterprise.controller.client.ControllerClientGrpc;
import io.deephaven.enterprise.dnd.client.DeephavenClusterConnection;
import io.deephaven.enterprise.dnd.client.DndSession;
import io.deephaven.enterprise.dnd.client.DndSessionFactoryFlight;
import io.deephaven.enterprise.dnd.client.DndSessionFlight;
import io.deephaven.proto.controller.PersistentQueryInfoMessage;
import io.deephaven.proto.controller.PersistentQueryStatusEnum;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static io.deephaven.proto.controller.PersistentQueryStatusEnum.PQS_RUNNING;


public class FlightSqlExample {
    /**
     * Container for a Core+ session factory. The created controllerClient can be used to perform programmatic PQ management,
     * and is used here to start the PQ if needed.
     * <br>
     * The first step when using the Java client is to create a session. A session is a connection to the
     * Deephaven server that allows authentication with the server, table creation, persistent query access, and more. The session
     * factory can then be used to create a session. It requires one of the following two things to connect to the server:
     *
     * <ul>
     *   <li>A URL to a connection.json file, which is the method used in the example</li>
     *   <li>A JSON object with the connection details</li>
     * </ul>
     * <p>
     * All Deephaven servers expose a connection.json file, which can be seen on {@code <server URL>/iris/connection.json}.
     * You'll see the example Gradle files use a server URL, which the application uses to retrieve this. The application
     * uses it to create its {@link DndSessionFactoryFlight} and {@link DeephavenClusterConnection} instances.
     */
    private static class CorePlusSessionFactory {
        final DndSessionFactoryFlight sessionFactory;
        final ControllerClientGrpc controllerClient;
        final UserContext userContext;

        /**
         * Create and authenticate the session factory.
         *
         * @param serverUrl the server's URL, like {@code https://server.domain.com:8000} or {@code https://server.domain.com:8123}
         * @param userName  a username
         * @param password  a password
         * @param keyFile   an authentication keyfile
         * @throws IOException from the underlying calls
         */
        private CorePlusSessionFactory(final String serverUrl, final String userName, final String password, final String keyFile) throws IOException {
            System.out.println("Connecting to " + serverUrl);
            final String connectionUrl = serverUrl + "/iris/connection.json";
            sessionFactory = new DndSessionFactoryFlight(connectionUrl);

            final DeephavenClusterConnection clusterConnection = new DeephavenClusterConnection(connectionUrl);
            if (keyFile != null) {
                sessionFactory.privateKey(keyFile);
                clusterConnection.privateKey(keyFile);
            } else {
                sessionFactory.password(userName, password);
                clusterConnection.password(userName, password);
            }

            userContext = clusterConnection.getUserContext();
            controllerClient = clusterConnection.getControllerClient();
        }

        /**
         * Return the initialized {@link CorePlusSessionFactory}
         *
         * @return the {@link CorePlusSessionFactory}
         */
        public DndSessionFactoryFlight getSessionFactory() {
            return sessionFactory;
        }

        /**
         * Get the user that this session is operating as.
         *
         * @return the user that this session is operating as
         */
        public String getEffectiveUser() {
            return userContext.getEffectiveUser();
        }

        /**
         * Ensure that a PQ is running.
         *
         * @param pqName the PQ name
         */
        public void ensurePqRunning(final String pqName) {
            final PersistentQueryInfoMessage infoMessage = sessionFactory.getPQ(pqName);
            if (infoMessage == null) {
                throw new IllegalArgumentException("Persistent query " + pqName + " does not exist");
            }

            final PersistentQueryStatusEnum pqStatus = infoMessage.getState().getStatus();
            if (pqStatus == PQS_RUNNING) {
                return;
            }

            final AtomicReference<PersistentQueryStatusEnum> newStatusReference = new AtomicReference<>(null);

            final ControllerClientGrpc.ObserverImpl observer = new ControllerClientGrpc.ObserverImpl() {
                @Override
                public void handlePut(@NotNull final PersistentQueryInfoMessage value) {
                    System.out.println("Received update for " + value.getConfig().getName() + ": " + value.getState().getStatus());
                    synchronized (newStatusReference) {
                        final String newPqName = value.getConfig().getName();
                        if (!pqName.equals(newPqName)) {
                            return;
                        }

                        final PersistentQueryStatusEnum observedStatus = value.getState().getStatus();
                        if (isTerminalOrRunning(observedStatus)) {
                            newStatusReference.set(observedStatus);
                            newStatusReference.notify();
                        }
                    }
                }
            };

            controllerClient.subscribeToAll();

            // Get the PQ to running...
            final long maxSleepMillis = 60_000;
            try {
                synchronized (newStatusReference) {
                    controllerClient.addObserver(observer);
                    controllerClient.restartQuery(infoMessage.getConfig());
                    long currentTime = System.currentTimeMillis();
                    final long endTime = currentTime + maxSleepMillis;
                    // We'll give it 60 seconds to get to running or fail
                    while (currentTime < endTime && !isTerminalOrRunning(newStatusReference.get())) {
                        final long sleepMillis = endTime - currentTime;
                        newStatusReference.wait(sleepMillis);
                        currentTime = System.currentTimeMillis();
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for PQ to get running", e);
            } finally {
                controllerClient.removeObserver(observer);
            }

            final PersistentQueryStatusEnum latestStatus = newStatusReference.get();
            if (newStatusReference.get() != PQS_RUNNING) {
                throw new IllegalStateException("PQ did not get to running in " + maxSleepMillis + " milliseconds, state is " + latestStatus);
            }
        }

        private boolean isTerminalOrRunning(final PersistentQueryStatusEnum status) {
            return status != null && (status == PQS_RUNNING || controllerClient.isTerminal(status));
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

        String hostName = "your-dh-host";
        int port = 8123; // or 8000
        String userName = "your-username";
        String password = "your-password";
        String pqName = "FlightSqlTestJavaClientPQ";
        String tableName = "pel";

        String url = "https://" + hostName + ":" + port;
        HttpUrlPropertyInputStreamLoader.setServerUrl(url);

        // The session factory will make the connection to the specified Deephaven server and then authenticate.
        final CorePlusSessionFactory sessionFactory;
        try {
            sessionFactory = new CorePlusSessionFactory(url, userName, password, null);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        sessionFactory.ensurePqRunning(pqName);
        DeephavenClusterConnection dcc =  sessionFactory.getSessionFactory().getConnection();

        // Ensure we release resources when done
        DndSessionFlight dndFlightSession = null;
        try {
            dndFlightSession = sessionFactory.getSessionFactory().persistentQuery(pqName);

            final PersistentQueryStatusEnum statusEnum = dndFlightSession.getInfo().getState().getStatus();
            if (statusEnum != PQS_RUNNING) {
                throw new RuntimeException("Persistent query must be running");
            }

            // The catalog shows all the tables available from this server
            System.out.println("================ System Catalog ===============");
            try (final FlightStream catalog = dndFlightSession.streamOf(DndSession.catalogTable())) {
                while (catalog.next()) {
                    final VectorSchemaRoot vectorSchemaRoot = catalog.getRoot();
                    System.out.println(vectorSchemaRoot.contentToTSVString());
                }
            }
            System.out.println("============ End of System Catalog ============\n");

            // Execute a FlightSql query to retrieve data from the specified table
            try (final FlightSqlClient flightSqlClient = new FlightSqlClient(dndFlightSession.getFlightSession().getClient())) {
                FlightInfo flightInfo = flightSqlClient.execute("SELECT * FROM " + tableName + " LIMIT 5");
                System.out.println("============== FlightSql Result ===============");
                try (FlightStream stream = flightSqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) {
                    while (stream.next()) {
                        VectorSchemaRoot root = stream.getRoot();
                        System.out.println(root.contentToTSVString()); // Print batch as TSV
                    }
                }
                System.out.println("========== End of FlightSql Result ============");
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (dndFlightSession != null) {
                try {
                    dndFlightSession.close();
                } catch (Exception ignored) {
                }
            }
        }
    }
}

Sample output

If the example runs successfully, you should see output similar to the following. The system catalog section lists available tables, and the Flight SQL result section displays the first few rows from the queried table.

================ System Catalog ===============
Namespace	TableName	NamespaceSet
DbInternal	AuditEventLog	System
DbInternal	PersistentQueryConfigurationLog	System
DbInternal	PersistentQueryConfigurationLogV2	System
DbInternal	PersistentQueryStateLog	System
DbInternal	ProcessEventLog	System
DbInternal	ProcessEventLogIndex	System
DbInternal	ProcessInfo	System
DbInternal	ProcessInfoLogCommunity	System
DbInternal	ProcessMetrics	System
DbInternal	ProcessMetricsLogCommunity	System
DbInternal	ProcessMetricsLogCoreV2	System
DbInternal	ProcessTelemetry	System
DbInternal	ProcessTelemetryIndex	System
DbInternal	QueryOperationPerformanceLog	System
DbInternal	QueryOperationPerformanceLogCommunity	System
DbInternal	QueryOperationPerformanceLogCommunityIndex	System
DbInternal	QueryOperationPerformanceLogCoreV2	System
DbInternal	QueryOperationPerformanceLogCoreV2Index	System
DbInternal	QueryOperationPerformanceLogIndex	System
DbInternal	QueryPerformanceLog	System
DbInternal	QueryPerformanceLogCommunity	System
DbInternal	QueryPerformanceLogCommunityIndex	System
DbInternal	QueryPerformanceLogCoreV2	System
DbInternal	QueryPerformanceLogCoreV2Index	System
DbInternal	QueryPerformanceLogIndex	System
DbInternal	QueryUserAssignmentLog	System
DbInternal	ResourceUtilization	System
DbInternal	ServerStateLogCommunity	System
DbInternal	ServerStateLogCommunityIndex	System
DbInternal	ServerStateLogCoreV2	System
DbInternal	ServerStateLogCoreV2Index	System
DbInternal	UpdatePerformanceAncestors	System
DbInternal	UpdatePerformanceAncestorsIndex	System
DbInternal	UpdatePerformanceLog	System
DbInternal	UpdatePerformanceLogCommunity	System
DbInternal	UpdatePerformanceLogCommunityIndex	System
DbInternal	UpdatePerformanceLogCoreV2	System
DbInternal	UpdatePerformanceLogCoreV2Index	System
DbInternal	UpdatePerformanceLogIndex	System
DbInternal	WorkspaceData	System
DbInternal	WorkspaceDataSnapshot	System
FeedOS	EquityQuoteL1	System
FeedOS	EquityTradeL1	System
FeedOS	OPRAQuoteL1	System
FeedOS	OPRATradeL1	System
LearnDeephaven	EODTrades	System
LearnDeephaven	StockQuotes	System
LearnDeephaven	StockTrades	System
Market	EqQuote	System
Market	EqTrade	System
MarketUs	QuoteNbboStock	System
MarketUs	TradeNbboStock	System
============ End of System Catalog ============


============== FlightSql Result ===============
Timestamp	LogEntry
1751984054149000000	db_query_server INITIALIZING

1751984054151000000	Deephaven Version: 1.20240517.250703155710g46b84a6b9e

1751984054151000000	VCS Version: 46b84a6b9eb8d92d287b4d86661db0edfb321538*

1751984054151000000	[io.deephaven.shadow.core.io.grpc.NameResolverRegistry:getDefaultRegistry:tid=1] Service loader found io.deephaven.shadow.core.io.grpc.internal.DnsNameResolverProvider@7446d8d5

1751984054151000000	[io.deephaven.shadow.core.io.grpc.NameResolverRegistry:getDefaultRegistry:tid=1] Service loader found io.deephaven.shadow.core.io.grpc.netty.UdsNameResolverProvider@a4b2d8f


========== End of FlightSql Result ============