Core+ Java Fight SQL Client
Flight SQL is a protocol on top of Arrow Flight that exposes SQL-like capabilities. Deephaven Core+ workers provide a built-in Flight SQL server that makes tables in the global query scope available as named tables. These tables can be accessed and queried through the default Flight SQL catalog using standard SQL queries.
Note
SELECT
queries are supported; INSERT
, UPDATE
, and DELETE
queries are not currently supported.
While Flight SQL may be an easy jumping-off point, the Flight SQL APIs are not Live Dataframe APIs. If you need to receive real-time updates, the best option is the Deephaven-native clients, such as the Deephaven Core+ Java client.
In Deephaven Community Core, you can connect directly to the Flight SQL server using most standard Flight SQL clients. For details, see the Community Flight SQL guide.
Note
In Deephaven Core+, standard Flight SQL clients do not work out of the box because of enhanced authentication and session management requirements. You can still connect by implementing a small amount of custom code, as described below.
The source code for this guide is available in a ZIP archive.
Prerequisites
This guide assumes:
-
You are using a Linux-based operating system. While the scripts and Java code used should also work on Windows, command syntax and environment setup may differ slightly between platforms.
-
You are familiar with Java and Gradle.
You must have the following to use the Java Flight SQL client:
- Access to a running Deephaven Core+ instance with Flight SQL enabled (Grizzly version 1.20240517.491 or later).
- JDK 17 or newer installed.
- Gradle 7.3 or newer installed.
Set up a Gradle project
To get started, create a new Gradle Java project:
mkdir deephaven-flightsql-client
cd deephaven-flightsql-client
gradle init \
--type java-application \
--dsl groovy \
--project-name deephaven-flightsql-client \
--package io.deephaven.example \
--java-version 17 \
--test-framework junit \
--no-split-project
This will generate a basic Java application structure using Gradle, with the package set to io.deephaven.example
and Java 17 as the language version.
Configure dependencies and authentication
To use the Core+ Java client, you need access to Deephaven's private Artifactory repository. Obtain your Artifactory username and API key from your administrator. Then, add the following lines to your gradle.properties
file (typically located at ${HOME}/.gradle/gradle.properties
):
artifactoryUser=<your username>
artifactoryAPIKey=<your Artifactory key>
Next, replace the contents of your project's build.gradle
file with the following configuration:
plugins {
id 'application'
}
def dhcVersion = "0.39.5"
def dheVersion = "1.20240517.501"
def arrowVersion = "18.0.0"
repositories {
mavenCentral()
maven {
credentials {
username = artifactoryUser ?: System.getProperty('user.name')
password = artifactoryAPIKey ?: ''
}
url "https://illumon.jfrog.io/illumon/libs-customer"
}
}
dependencies {
runtimeOnly "io.deephaven:deephaven-log-to-slf4j:${dhcVersion}"
runtimeOnly 'ch.qos.logback:logback-classic:1.4.5'
implementation "iris:client-flight:$dheVersion"
implementation "iris:client-base:$dheVersion"
implementation "iris:AuthLib:$dheVersion"
implementation "iris:ControllerClientGrpc:$dheVersion"
implementation "org.apache.arrow:flight-sql:${arrowVersion}"
}
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}
application {
mainClass = 'io.deephaven.example.FlightSqlExample'
applicationDefaultJvmArgs = [
"-DDeephavenEnterprise.rootFile=iris-common.prop",
"--add-opens=java.base/java.nio=ALL-UNNAMED"
]
}
Note
Update the dhcVersion
and dheVersion
variables to match the versions of Deephaven you are connecting to.
Persistent Query setup
This example demonstrates how to connect to a Persistent Query to retrieve data. First, log in to your Deephaven server's web UI, then use the Query Monitor to create a Persistent Query.
- On the Settings tab:
- Name the PQ
FlightSqlTestJavaClientPQ
. - Select
Live Query (Script)
for the type. - Select a DB Server of
AutoQuery
. - Use the Core+ engine.
- Give it a heap size of 1 GB.
-
On the Scheduling tab, select
Disabled
. -
On the Script tab, select a Runtime of
Python
. The script consists of one line to retrieve theProcessEventLog
table and view theLogEntry
column in the order it was written. This table contains events written out by Deephaven workers and can help diagnose issues.
pel = (
db.live_table("DbInternal", "ProcessEventLog")
.where("Date=today()")
.sort("Timestamp")
.view(["Timestamp", "LogEntry"])
)
- Save the PQ.
Flight SQL example
Below is a complete example Java program, FlightSqlExample.java
, that demonstrates connecting to a Deephaven Core+ server using the Flight SQL protocol and retrieving data from a table via SQL query.
This example covers:
- Creating a Deephaven session and authenticating with the server.
- Ensuring the specified PQ is running.
- Listing available tables in the system catalog.
- Leveraging the FlightClient from your session to create a FlightSqlClient.
- Using the FlightSqlClient to execute SQL queries against Deephaven tables, retrieve results, and print them.
To use this example:
- Update the placeholders in the code (
your-dh-host
,your-username
,your-password
) with your actual Deephaven server details and credentials. - Ensure the PQ name (
FlightSqlTestJavaClientPQ
) and table name (pel
) match those created in the previous steps. - Place the code in
src/main/java/io/deephaven/example/FlightSqlExample.java
within your Gradle project. - Build and run the project using Gradle from the project's root directory.
./gradlew run
Expand the section below to view the full source code.
FlightSqlExample.java
package io.deephaven.example;
import io.deephaven.enterprise.auth.UserContext;
import io.deephaven.enterprise.config.HttpUrlPropertyInputStreamLoader;
import io.deephaven.enterprise.controller.client.ControllerClientGrpc;
import io.deephaven.enterprise.dnd.client.DeephavenClusterConnection;
import io.deephaven.enterprise.dnd.client.DndSession;
import io.deephaven.enterprise.dnd.client.DndSessionFactoryFlight;
import io.deephaven.enterprise.dnd.client.DndSessionFlight;
import io.deephaven.proto.controller.PersistentQueryInfoMessage;
import io.deephaven.proto.controller.PersistentQueryStatusEnum;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static io.deephaven.proto.controller.PersistentQueryStatusEnum.PQS_RUNNING;
public class FlightSqlExample {
/**
* Container for a Core+ session factory. The created controllerClient can be used to perform programmatic PQ management,
* and is used here to start the PQ if needed.
* <br>
* The first step when using the Java client is to create a session. A session is a connection to the
* Deephaven server that allows authentication with the server, table creation, persistent query access, and more. The session
* factory can then be used to create a session. It requires one of the following two things to connect to the server:
*
* <ul>
* <li>A URL to a connection.json file, which is the method used in the example</li>
* <li>A JSON object with the connection details</li>
* </ul>
* <p>
* All Deephaven servers expose a connection.json file, which can be seen on {@code <server URL>/iris/connection.json}.
* You'll see the example Gradle files use a server URL, which the application uses to retrieve this. The application
* uses it to create its {@link DndSessionFactoryFlight} and {@link DeephavenClusterConnection} instances.
*/
private static class CorePlusSessionFactory {
final DndSessionFactoryFlight sessionFactory;
final ControllerClientGrpc controllerClient;
final UserContext userContext;
/**
* Create and authenticate the session factory.
*
* @param serverUrl the server's URL, like {@code https://server.domain.com:8000} or {@code https://server.domain.com:8123}
* @param userName a username
* @param password a password
* @param keyFile an authentication keyfile
* @throws IOException from the underlying calls
*/
private CorePlusSessionFactory(final String serverUrl, final String userName, final String password, final String keyFile) throws IOException {
System.out.println("Connecting to " + serverUrl);
final String connectionUrl = serverUrl + "/iris/connection.json";
sessionFactory = new DndSessionFactoryFlight(connectionUrl);
final DeephavenClusterConnection clusterConnection = new DeephavenClusterConnection(connectionUrl);
if (keyFile != null) {
sessionFactory.privateKey(keyFile);
clusterConnection.privateKey(keyFile);
} else {
sessionFactory.password(userName, password);
clusterConnection.password(userName, password);
}
userContext = clusterConnection.getUserContext();
controllerClient = clusterConnection.getControllerClient();
}
/**
* Return the initialized {@link CorePlusSessionFactory}
*
* @return the {@link CorePlusSessionFactory}
*/
public DndSessionFactoryFlight getSessionFactory() {
return sessionFactory;
}
/**
* Get the user that this session is operating as.
*
* @return the user that this session is operating as
*/
public String getEffectiveUser() {
return userContext.getEffectiveUser();
}
/**
* Ensure that a PQ is running.
*
* @param pqName the PQ name
*/
public void ensurePqRunning(final String pqName) {
final PersistentQueryInfoMessage infoMessage = sessionFactory.getPQ(pqName);
if (infoMessage == null) {
throw new IllegalArgumentException("Persistent query " + pqName + " does not exist");
}
final PersistentQueryStatusEnum pqStatus = infoMessage.getState().getStatus();
if (pqStatus == PQS_RUNNING) {
return;
}
final AtomicReference<PersistentQueryStatusEnum> newStatusReference = new AtomicReference<>(null);
final ControllerClientGrpc.ObserverImpl observer = new ControllerClientGrpc.ObserverImpl() {
@Override
public void handlePut(@NotNull final PersistentQueryInfoMessage value) {
System.out.println("Received update for " + value.getConfig().getName() + ": " + value.getState().getStatus());
synchronized (newStatusReference) {
final String newPqName = value.getConfig().getName();
if (!pqName.equals(newPqName)) {
return;
}
final PersistentQueryStatusEnum observedStatus = value.getState().getStatus();
if (isTerminalOrRunning(observedStatus)) {
newStatusReference.set(observedStatus);
newStatusReference.notify();
}
}
}
};
controllerClient.subscribeToAll();
// Get the PQ to running...
final long maxSleepMillis = 60_000;
try {
synchronized (newStatusReference) {
controllerClient.addObserver(observer);
controllerClient.restartQuery(infoMessage.getConfig());
long currentTime = System.currentTimeMillis();
final long endTime = currentTime + maxSleepMillis;
// We'll give it 60 seconds to get to running or fail
while (currentTime < endTime && !isTerminalOrRunning(newStatusReference.get())) {
final long sleepMillis = endTime - currentTime;
newStatusReference.wait(sleepMillis);
currentTime = System.currentTimeMillis();
}
}
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for PQ to get running", e);
} finally {
controllerClient.removeObserver(observer);
}
final PersistentQueryStatusEnum latestStatus = newStatusReference.get();
if (newStatusReference.get() != PQS_RUNNING) {
throw new IllegalStateException("PQ did not get to running in " + maxSleepMillis + " milliseconds, state is " + latestStatus);
}
}
private boolean isTerminalOrRunning(final PersistentQueryStatusEnum status) {
return status != null && (status == PQS_RUNNING || controllerClient.isTerminal(status));
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
String hostName = "your-dh-host";
int port = 8123; // or 8000
String userName = "your-username";
String password = "your-password";
String pqName = "FlightSqlTestJavaClientPQ";
String tableName = "pel";
String url = "https://" + hostName + ":" + port;
HttpUrlPropertyInputStreamLoader.setServerUrl(url);
// The session factory will make the connection to the specified Deephaven server and then authenticate.
final CorePlusSessionFactory sessionFactory;
try {
sessionFactory = new CorePlusSessionFactory(url, userName, password, null);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
sessionFactory.ensurePqRunning(pqName);
DeephavenClusterConnection dcc = sessionFactory.getSessionFactory().getConnection();
// Ensure we release resources when done
DndSessionFlight dndFlightSession = null;
try {
dndFlightSession = sessionFactory.getSessionFactory().persistentQuery(pqName);
final PersistentQueryStatusEnum statusEnum = dndFlightSession.getInfo().getState().getStatus();
if (statusEnum != PQS_RUNNING) {
throw new RuntimeException("Persistent query must be running");
}
// The catalog shows all the tables available from this server
System.out.println("================ System Catalog ===============");
try (final FlightStream catalog = dndFlightSession.streamOf(DndSession.catalogTable())) {
while (catalog.next()) {
final VectorSchemaRoot vectorSchemaRoot = catalog.getRoot();
System.out.println(vectorSchemaRoot.contentToTSVString());
}
}
System.out.println("============ End of System Catalog ============\n");
// Execute a FlightSql query to retrieve data from the specified table
try (final FlightSqlClient flightSqlClient = new FlightSqlClient(dndFlightSession.getFlightSession().getClient())) {
FlightInfo flightInfo = flightSqlClient.execute("SELECT * FROM " + tableName + " LIMIT 5");
System.out.println("============== FlightSql Result ===============");
try (FlightStream stream = flightSqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) {
while (stream.next()) {
VectorSchemaRoot root = stream.getRoot();
System.out.println(root.contentToTSVString()); // Print batch as TSV
}
}
System.out.println("========== End of FlightSql Result ============");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (dndFlightSession != null) {
try {
dndFlightSession.close();
} catch (Exception ignored) {
}
}
}
}
}
Sample output
If the example runs successfully, you should see output similar to the following. The system catalog section lists available tables, and the Flight SQL result section displays the first few rows from the queried table.
================ System Catalog ===============
Namespace TableName NamespaceSet
DbInternal AuditEventLog System
DbInternal PersistentQueryConfigurationLog System
DbInternal PersistentQueryConfigurationLogV2 System
DbInternal PersistentQueryStateLog System
DbInternal ProcessEventLog System
DbInternal ProcessEventLogIndex System
DbInternal ProcessInfo System
DbInternal ProcessInfoLogCommunity System
DbInternal ProcessMetrics System
DbInternal ProcessMetricsLogCommunity System
DbInternal ProcessMetricsLogCoreV2 System
DbInternal ProcessTelemetry System
DbInternal ProcessTelemetryIndex System
DbInternal QueryOperationPerformanceLog System
DbInternal QueryOperationPerformanceLogCommunity System
DbInternal QueryOperationPerformanceLogCommunityIndex System
DbInternal QueryOperationPerformanceLogCoreV2 System
DbInternal QueryOperationPerformanceLogCoreV2Index System
DbInternal QueryOperationPerformanceLogIndex System
DbInternal QueryPerformanceLog System
DbInternal QueryPerformanceLogCommunity System
DbInternal QueryPerformanceLogCommunityIndex System
DbInternal QueryPerformanceLogCoreV2 System
DbInternal QueryPerformanceLogCoreV2Index System
DbInternal QueryPerformanceLogIndex System
DbInternal QueryUserAssignmentLog System
DbInternal ResourceUtilization System
DbInternal ServerStateLogCommunity System
DbInternal ServerStateLogCommunityIndex System
DbInternal ServerStateLogCoreV2 System
DbInternal ServerStateLogCoreV2Index System
DbInternal UpdatePerformanceAncestors System
DbInternal UpdatePerformanceAncestorsIndex System
DbInternal UpdatePerformanceLog System
DbInternal UpdatePerformanceLogCommunity System
DbInternal UpdatePerformanceLogCommunityIndex System
DbInternal UpdatePerformanceLogCoreV2 System
DbInternal UpdatePerformanceLogCoreV2Index System
DbInternal UpdatePerformanceLogIndex System
DbInternal WorkspaceData System
DbInternal WorkspaceDataSnapshot System
FeedOS EquityQuoteL1 System
FeedOS EquityTradeL1 System
FeedOS OPRAQuoteL1 System
FeedOS OPRATradeL1 System
LearnDeephaven EODTrades System
LearnDeephaven StockQuotes System
LearnDeephaven StockTrades System
Market EqQuote System
Market EqTrade System
MarketUs QuoteNbboStock System
MarketUs TradeNbboStock System
============ End of System Catalog ============
============== FlightSql Result ===============
Timestamp LogEntry
1751984054149000000 db_query_server INITIALIZING
1751984054151000000 Deephaven Version: 1.20240517.250703155710g46b84a6b9e
1751984054151000000 VCS Version: 46b84a6b9eb8d92d287b4d86661db0edfb321538*
1751984054151000000 [io.deephaven.shadow.core.io.grpc.NameResolverRegistry:getDefaultRegistry:tid=1] Service loader found io.deephaven.shadow.core.io.grpc.internal.DnsNameResolverProvider@7446d8d5
1751984054151000000 [io.deephaven.shadow.core.io.grpc.NameResolverRegistry:getDefaultRegistry:tid=1] Service loader found io.deephaven.shadow.core.io.grpc.netty.UdsNameResolverProvider@a4b2d8f
========== End of FlightSql Result ============