Develop a Java Query

This section of the crash course covers Deephaven Enterprise's client APIs. This guide discusses the Java client, which allows you to:

  • Create new workers
  • Interact with tables and objects on the server
  • Connect to existing Persistent Queries (PQs)
  • Create PQs
  • Run queries server-side
  • And more!

Prerequisites

To access the examples, you will need to connect to Deephaven's Artifactory using an API key. Please ensure you have the necessary username (provided by your administrator) and API key to download the required files.

Add the following entries to your gradle.properties file, which is located at ${HOME}/.gradle/gradle.properties on a Linux-based system:

artifactoryUser=<your username>
artifactoryAPIKey=<your Artifactory key>

Alternatively, if you already have a development environment with access to the Deephaven code, you can use that instead of the provided Gradle files.

Setup

Persistent Query setup

This example demonstrates how to connect to a Persistent Query to retrieve data. First, log in to your Deephaven server's web UI, then use the Query Monitor to create a Persistent Query.

  1. On the Settings tab:
  • Call the PQ MyTestJavaClientPQ.
  • Select Live Query (Script) for the type.
  • Select a DB Server of AutoQuery.
  • Use the Core+ engine.
  • Give it a heap size of 1 GB.
  1. On the Scheduling tab, select Disabled.

  2. On the Script tab, select a Runtime of Groovy. The script consists of one line to retrieve the ProcessEventLog table and view the LogEntry column in the order it was written. This table contains events written out by Deephaven workers and can help diagnose issues.

pel=db.liveTable("DbInternal", "ProcessEventLog").where("Date=today()").sort("Timestamp").view("LogEntry")
  1. Save the PQ.

Create your project

This crash course assumes you're on a Linux-based operating system. While the Gradle files are compatible with Windows, the commands will differ.

The Java client is available in two packages: client-barrage and client-flight.

  • The client-barrage package requires at least Java 11 and includes Deephaven's Barrage extension. This allows you to use the full power of the ticking Deephaven engine.
  • The client-flight package is a Java 8 compatible minimal client that lets you access static snapshots of Deephaven tables using the Arrow Flight protocol.

First, create a directory to hold the examples. The rest of the example assumes you're running from this directory. For example:

cd ~
mkdir javaClient
cd javaClient

Gradle setup

To use the Java client, you'll need either the Barrage or Flight Deephaven application JAR files. A convenient way to obtain them is through Gradle.

If you are already familiar with Gradle, you can follow the Gradle instructions to set it up in this directory.

Alternatively, you can download the example Gradle wrapper zip file, copy it to the directory you created earlier, and then unzip it in your example client directory.

cp ~/Downloads/download.zip .
unzip download.zip

Run the Gradle wrapper command to set up the environment.

./gradlew wrapper

Application build files

Two Gradle files are used to compile and run the examples. Based on their contents, Gradle automatically determines and downloads the required dependencies.

  • One file is for the Barrage client, and one is for the Flight client.
  • You must update the dhcVersion and dheVersion variables within the Gradle files to reflect the version of Deephaven that you are connecting to.
  • The application and run sections are used with the self-contained Java examples you'll create soon.
    • The Barrage example relies on a class called io.deephaven.enterprise.dnd.client.example.BarrageExample.
    • The Flight example relies on a class called io.deephaven.enterprise.dnd.client.example.FlightExample.

To use these examples with the suggested directories, edit settings.gradle in your project directory. Depending on whether you want to use Barrage or Flight, add one or both of these lines:

include 'FlightModule'
include 'BarrageModule'

Then follow these instructions for Barrage, Flight, or both.

Gradle build details.

Create a directory and create a new build.gradle file in it.

mkdir BarrageModule
vi BarrageModule/build.gradle

Add the following contents to the file. When adding the text make a couple of changes.

  • Update dhcVersion and dheVersion to the versions on your server. If you start your test PQ, you can see these in the Summary tab.
  • Update /path/to/keyfile with the path to your keyfile.
  • Alternatively, you can use a username and password for testing, although this is insecure.
plugins {
    id 'application'
}

def dhcVersion="0.37.4"
def dheVersion="1.20240517.344"

// Barrage requires at least Java 11 but we'll use 17

sourceCompatibility="17"
targetCompatibility="17"

repositories {
    mavenCentral()

    maven {
        credentials {
            username = artifactoryUser ?: System.getProperty('user.name')
            password = artifactoryAPIKey ?: ''
        }
        url "https://illumon.jfrog.io/illumon/libs-customer"
    }
}

dependencies {
    runtimeOnly "io.deephaven:deephaven-log-to-slf4j:${dhcVersion}"
    implementation "info.picocli:picocli:4.6.1"

    implementation "iris:client-barrage:$dheVersion"
    implementation "iris:client-base:$dheVersion"
    implementation "iris:AuthLib:$dheVersion"
    implementation "iris:ControllerClientGrpc:$dheVersion"
}

application {
    mainClassName = 'io.deephaven.enterprise.dnd.client.example.BarrageExample'
    applicationDefaultJvmArgs = ["-DDeephavenEnterprise.rootFile=iris-common.prop",
                                 "--add-opens=java.base/java.nio=ALL-UNNAMED"]
}

tasks.named('run', JavaExec).configure {
    args('--url', 'https://server.domain.com:8000', '--keyfile', '/path/to/keyfile', '--pqname', 'MyTestJavaClientPQ', '--tablename', 'pel')
//    args('--url', 'https://server.domain.com:8000', '--username', 'username', '--password', 'password', '--pqname', 'MyTestJavaClientPQ', '--tablename', 'pel')
}

Note

This example uses port 8000. Your server may use a different port. Typically, the port is 8123 for servers without Envoy and 8000 for servers with Envoy.

Create a directory and create a new build.gradle file in it.

mkdir FlightModule
vi FlightModule/build.gradle

Add the following contents to the file. When adding the text make a couple of changes.

  • Update dheVersion to the version on your server. You can see this by clicking on the User Settings button in the web UI.
  • Update /path/to/keyfile with the path to your keyfile.
  • Alternatively, you can use a username and password for testing, although this is insecure.
plugins {
    id 'application'
}

def dheVersion="1.20240517.344"

sourceCompatibility="17"
targetCompatibility="17"

repositories {
    mavenCentral()

    maven {
        credentials {
            username = artifactoryUser ?: System.getProperty('user.name')
            password = artifactoryAPIKey ?: ''
        }
        url "https://illumon.jfrog.io/illumon/libs-customer"
    }
}

dependencies {
    implementation "info.picocli:picocli:4.6.1"

    implementation "iris:client-flight:$dheVersion"
    implementation "iris:client-base:$dheVersion"
    implementation "iris:AuthLib:$dheVersion"
    implementation "iris:ControllerClientGrpc:$dheVersion"
}

application {
    mainClassName = 'io.deephaven.enterprise.dnd.client.example.FlightExample'
    applicationDefaultJvmArgs = ["-DDeephavenEnterprise.rootFile=iris-common.prop"]
}

tasks.named('run', JavaExec).configure {
    args('--url', 'https://server.domain.com:8000', '--keyfile', '/path/to/keyfile', '--pqname', 'MyTestJavaClientPQ', '--tablename', 'pel')
//    args('--url', 'https://server.domain.com:8000', '--username', 'username', '--password', 'password', '--pqname', 'MyTestJavaClientPQ', '--tablename', 'pel')
}

Note

This example uses port 8000. Your server may use a different port. Typically, the port is 8123 for servers without Envoy and 8000 for servers with Envoy.

Update the Gradle file's args line (at the bottom of the file) with an appropriate URL, username, password, Persistent Query name, and table name. The PQ name should be the one created earlier (MyTestJavaClientPQ), and the table name should be the one from that PQ's script (pel).

Alternatively, you can use a keyfile that's been set up instead of the username and password. This is the recommended authentication method.

Create the Java client code

Depending on whether you're creating a Barrage or a Flight application, follow the appropriate instructions below to create the Java application. The code contains comments explaining what it's doing.

Java client code
  1. Create the Barrage Java client code.
mkdir -p BarrageModule/src/main/java/io/deephaven/enterprise/dnd/client/example
vi BarrageModule/src/main/java/io/deephaven/enterprise/dnd/client/example/BarrageExample.java
  1. Copy the following code into the file.
package io.deephaven.enterprise.dnd.client.example;

import io.deephaven.engine.table.Table;
import io.deephaven.engine.util.TableTools;
import io.deephaven.enterprise.auth.UserContext;
import io.deephaven.enterprise.config.HttpUrlPropertyInputStreamLoader;
import io.deephaven.enterprise.controller.client.ControllerClientGrpc;
import io.deephaven.enterprise.dnd.client.DeephavenClusterConnection;
import io.deephaven.enterprise.dnd.client.DndSession;
import io.deephaven.enterprise.dnd.client.DndSessionBarrage;
import io.deephaven.enterprise.dnd.client.DndSessionFactoryBarrage;
import io.deephaven.proto.controller.PersistentQueryInfoMessage;
import io.deephaven.proto.controller.PersistentQueryStatusEnum;
import io.deephaven.qst.table.TableSpec;
import org.jetbrains.annotations.NotNull;
import picocli.CommandLine;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

import static io.deephaven.proto.controller.PersistentQueryStatusEnum.PQS_RUNNING;

/**
 * Simple Barrage core+ example client class.
 */
public class BarrageExample implements Callable<Integer> {
  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-u", "--url"}, description = "The server's URL", required = true)
  private String url;

  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-n", "--username"}, description = "A user name for authentication")
  private String userName;

  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-p", "--password"}, description = "A password for authentication")
  private String password;

  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-k", "--keyfile"}, description = "A keyfile for authentication")
  private String keyfile;

  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-pq", "--pqname"}, description = "A PQ name for the connection", required = true)
  private String pqName;

  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-t", "--tablename"}, description = "A table name to retrieve", required = true)
  private String tableName;

  /**
   * Container for a community session factory. The created controllerClient can be used to perform programmatic PQ management,
   * and is used here to start the PQ if needed.
   * <br>
   * The first and most important step when using the Java client is to create a session. A session is a connection to the
   * Deephaven server that allows authentication with the server, table creation, persistent query access, and more. The session
   * factory creates a session. It requires one of the following two things to connect to the server:
   *
   * <ul>
   *   <li>A URL to a connection.json file, which is the method used in the example</li>
   *   <li>A JSON object with the connection details</li>
   * </ul>
   *
   * All Deephaven servers expose a connection.json file, which can be seen on {@code <server URL>/iris/connection.json}.
   * You'll see the example Gradle files use a server URL, which the application uses to retrieve this. The application
   * uses it to create its {@link DndSessionFactoryBarrage} and {@link DeephavenClusterConnection} instances.
   */
  private static class CorePlusSessionFactory {
    final DndSessionFactoryBarrage sessionFactory;
    final ControllerClientGrpc controllerClient;
    final UserContext userContext;

    /**
     * Create and authenticate the session factory.
     *
     * @param serverUrl the server's URL, like {@code https://server.domain.com:8000} or {@code https://server.domain.com:8123}
     * @param userName  a username
     * @param password  a password
     * @param keyFile   an authentication keyfile
     * @throws IOException from the underlying calls
     */
    private CorePlusSessionFactory(final String serverUrl, final String userName, final String password, final String keyFile) throws IOException {
      System.out.println("Connecting to " + serverUrl);
      final String connectionUrl = serverUrl + "/iris/connection.json";
      sessionFactory = new DndSessionFactoryBarrage(connectionUrl);

      final DeephavenClusterConnection clusterConnection = new DeephavenClusterConnection(connectionUrl);

      if (keyFile != null) {
        sessionFactory.privateKey(keyFile);
        clusterConnection.privateKey(keyFile);
      } else {
        sessionFactory.password(userName, password);
        clusterConnection.password(userName, password);
      }

      userContext = clusterConnection.getUserContext();
      controllerClient = clusterConnection.getControllerClient();
    }

    /**
     * Return the initialized {@link CorePlusSessionFactory}
     *
     * @return the {@link CorePlusSessionFactory}
     */
    public DndSessionFactoryBarrage getSessionFactory() {
      return sessionFactory;
    }

    /**
     * Get the user that this session is operating as.
     *
     * @return the user that this session is operating as
     */
    public String getEffectiveUser() {
      return userContext.getEffectiveUser();
    }

    /**
     * Ensure that a PQ is running.
     *
     * @param pqName the PQ name
     */
    public void ensurePqRunning(final String pqName) {
      final PersistentQueryInfoMessage infoMessage = sessionFactory.getPQ(pqName);
      if (infoMessage == null) {
        throw new IllegalArgumentException("Persistent query " + pqName + " does not exist");
      }

      final PersistentQueryStatusEnum pqStatus = infoMessage.getState().getStatus();
      if (pqStatus == PQS_RUNNING) {
        return;
      }

      final AtomicReference<PersistentQueryStatusEnum> newStatusReference = new AtomicReference<>(null);

      final ControllerClientGrpc.ObserverImpl observer = new ControllerClientGrpc.ObserverImpl() {
        @Override
        public void handlePut(@NotNull final PersistentQueryInfoMessage value) {
          System.out.println("Received update for " + value.getConfig().getName() + ": " + value.getState().getStatus());
          synchronized (newStatusReference) {
            final String newPqName = value.getConfig().getName();
            if (!pqName.equals(newPqName)) {
              return;
            }

            final PersistentQueryStatusEnum observedStatus = value.getState().getStatus();
            if (isTerminalOrRunning(observedStatus)) {
              newStatusReference.set(observedStatus);
              newStatusReference.notify();
            }
          }
        }
      };

      controllerClient.subscribeToAll();

      // Get the PQ to running...
      final long maxSleepMillis = 60_000;
      try {
        synchronized (newStatusReference) {
          controllerClient.addObserver(observer);
          controllerClient.restartQuery(infoMessage.getConfig());
          long currentTime = System.currentTimeMillis();
          final long endTime = currentTime + maxSleepMillis;
          // We'll give it 60 seconds to get to running or fail
          while (currentTime < endTime && !isTerminalOrRunning(newStatusReference.get())) {
            final long sleepMillis = endTime - currentTime;
            newStatusReference.wait(sleepMillis);
            currentTime = System.currentTimeMillis();
          }
        }
      } catch(InterruptedException e){
        throw new RuntimeException("Interrupted while waiting for PQ to get running", e);
      } finally {
        controllerClient.removeObserver(observer);
      }

      final PersistentQueryStatusEnum latestStatus = newStatusReference.get();
      if (newStatusReference.get() != PQS_RUNNING) {
        throw new IllegalStateException("PQ did not get to running in " + maxSleepMillis + " milliseconds, state is " + latestStatus);
      }
    }

    private boolean isTerminalOrRunning(final PersistentQueryStatusEnum status) {
      return status != null && (status == PQS_RUNNING || controllerClient.isTerminal(status));
    }
  }

  public static void main(String[] args) {
    System.exit(new CommandLine(new BarrageExample()).execute(args));
  }

  /**
   * Used by picocli to inject the inspected command line parameters collected.
   *
   * @return the command line exit code (i.e. shown by echo $?)
   */
  @Override
  public Integer call() {
    // We must have either a username and password, or a keyfile
    if (keyfile == null) {
      if (userName == null || password == null) {
        throw new IllegalArgumentException("Must have a keyfile, or a username and password");
      }
    } else if (userName != null || password != null) {
      throw new IllegalArgumentException("Must have a keyfile, or a username and password");
    }

    // This must be done very early in program initialization because it tells the program how and where to get its configuration
    HttpUrlPropertyInputStreamLoader.setServerUrl(url);

    // The session factory will make the connection to the specified Deephaven server and then authenticate.
    final CorePlusSessionFactory sessionFactory;
    try {
      sessionFactory = new CorePlusSessionFactory(url, userName, password, keyfile);
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }

    sessionFactory.ensurePqRunning(pqName);

    DndSessionBarrage barrageSession = null;
    Table table = null;
    // Ensure we release resources when done - this is better than manually closing
    try  {
      barrageSession = sessionFactory.getSessionFactory().persistentQuery(pqName);
      final TableSpec testTable1 = DndSession.scopeTable(tableName);
      table = barrageSession.subscribeTo(testTable1);

      System.out.println("Subscribed to " + pqName + " - " + tableName + " from " + url);
      System.out.println("Size is " + table.size());

      // The catalog shows all the tables available from this server
      System.out.println("System catalog");
      final Table catalog = barrageSession.snapshotOf(DndSession.catalogTable());
      TableTools.show(catalog);

      // Use the session to perform an arbitrary operation. The PersistentQueryStateLog is an internal table showing
      // the history of all this user's persistent query executions.
      barrageSession.executeCode(
              "pqsl = db.liveTable(namespace=\"DbInternal\", table_name=\"PersistentQueryStateLog\").where(\"Date=today()\").tail(2)");
      final Table fromExCode = barrageSession.snapshotOf(DndSession.scopeTable("pqsl"));
      System.out.println("pqsl from executeCode");
      TableTools.show(fromExCode);

      // Fetch the PersistentQueryStateLog using a pre-filtered table through the session
      final TableSpec liveTableSpec = DndSession.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()", "LastEffectiveUser=`" + sessionFactory.getEffectiveUser() + "`");
      final Table fromTableSpec = barrageSession.subscribeTo(liveTableSpec);
      System.out.println("pqsl from TableSpec");
      TableTools.show(fromTableSpec);

      // Do your Barrage stuff here
    } catch (Exception e) {
      throw new RuntimeException(e);
    } finally {
      if (table != null) {
        table.close();
      }

      if (barrageSession != null) {
        try {
          barrageSession.close();
        } catch (IOException ignored) {
        }
      }
    }

    return 0;
  }
}
  1. Create the Flight Java client code.
mkdir -p FlightModule/src/main/java/io/deephaven/enterprise/dnd/client/example
vi FlightModule/src/main/java/io/deephaven/enterprise/dnd/client/example/FlightExample.java
  1. Copy the following code into the file.
package io.deephaven.enterprise.dnd.client.example;

import io.deephaven.enterprise.auth.UserContext;
import io.deephaven.enterprise.config.HttpUrlPropertyInputStreamLoader;
import io.deephaven.enterprise.controller.client.ControllerClientGrpc;
import io.deephaven.enterprise.dnd.client.DeephavenClusterConnection;
import io.deephaven.enterprise.dnd.client.DndSession;
import io.deephaven.enterprise.dnd.client.DndSessionFactoryFlight;
import io.deephaven.enterprise.dnd.client.DndSessionFlight;
import io.deephaven.proto.controller.PersistentQueryInfoMessage;
import io.deephaven.proto.controller.PersistentQueryStatusEnum;
import io.deephaven.qst.table.TableSpec;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.jetbrains.annotations.NotNull;
import picocli.CommandLine;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

import static io.deephaven.proto.controller.PersistentQueryStatusEnum.PQS_RUNNING;

/**
 * Simple Barrage core+ example client class.
 */
public class FlightExample implements Callable<Integer>  {
  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-u", "--url"}, description = "The server's URL", required = true)
  private String url;

  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-n", "--username"}, description = "A user name for authentication")
  private String userName;

  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-p", "--password"}, description = "A password for authentication")
  private String password;

  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-k", "--keyfile"}, description = "A keyfile for authentication")
  private String keyfile;

  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-pq", "--pqname"}, description = "A PQ name for the connection", required = true)
  private String pqName;

  @SuppressWarnings("unused")
  @CommandLine.Option(names={"-t", "--tablename"}, description = "A table name to retrieve", required = true)
  private String tableName;

  /**
   * Container for a community session factory. The created controllerClient can be used to perform programmatic PQ management,
   * and is used here to start the PQ if needed.
   * <br>
   * The first step when using the Java client is to create a session. A session is a connection to the
   * Deephaven server that allows authentication with the server, table creation, persistent query access, and more. The session
   * factory can then be used to create a session. It requires one of the following two things to connect to the server:
   *
   * <ul>
   *   <li>A URL to a connection.json file, which is the method used in the example</li>
   *   <li>A JSON object with the connection details</li>
   * </ul>
   *
   * All Deephaven servers expose a connection.json file, which can be seen on {@code <server URL>/iris/connection.json}.
   * You'll see the example Gradle files use a server URL, which the application uses to retrieve this. The application
   * uses it to create its {@link DndSessionFactoryFlight} and {@link DeephavenClusterConnection} instances.
   */
  private static class CorePlusSessionFactory {
    final DndSessionFactoryFlight sessionFactory;
    final ControllerClientGrpc controllerClient;
    final UserContext userContext;

    /**
     * Create and authenticate the session factory.
     *
     * @param serverUrl the server's URL, like {@code https://server.domain.com:8000} or {@code https://server.domain.com:8123}
     * @param userName  a username
     * @param password  a password
     * @param keyFile   an authentication keyfile
     * @throws IOException from the underlying calls
     */
    private CorePlusSessionFactory(final String serverUrl, final String userName, final String password, final String keyFile) throws IOException {
      System.out.println("Connecting to " + serverUrl);
      final String connectionUrl = serverUrl + "/iris/connection.json";
      sessionFactory = new DndSessionFactoryFlight(connectionUrl);

      final DeephavenClusterConnection clusterConnection = new DeephavenClusterConnection(connectionUrl);
      if (keyFile != null) {
        sessionFactory.privateKey(keyFile);
        clusterConnection.privateKey(keyFile);
      } else {
        sessionFactory.password(userName, password);
        clusterConnection.password(userName, password);
      }

      userContext = clusterConnection.getUserContext();
      controllerClient = clusterConnection.getControllerClient();
    }

    /**
     * Return the initialized {@link CorePlusSessionFactory}
     *
     * @return the {@link CorePlusSessionFactory}
     */
    public DndSessionFactoryFlight getSessionFactory() {
      return sessionFactory;
    }

    /**
     * Get the user that this session is operating as.
     *
     * @return the user that this session is operating as
     */
    public String getEffectiveUser() {
      return userContext.getEffectiveUser();
    }

    /**
     * Ensure that a PQ is running.
     *
     * @param pqName the PQ name
     */
    public void ensurePqRunning(final String pqName) {
      final PersistentQueryInfoMessage infoMessage = sessionFactory.getPQ(pqName);
      if (infoMessage == null) {
        throw new IllegalArgumentException("Persistent query " + pqName + " does not exist");
      }

      final PersistentQueryStatusEnum pqStatus = infoMessage.getState().getStatus();
      if (pqStatus == PQS_RUNNING) {
        return;
      }

      final AtomicReference<PersistentQueryStatusEnum> newStatusReference = new AtomicReference<>(null);

      final ControllerClientGrpc.ObserverImpl observer = new ControllerClientGrpc.ObserverImpl() {
        @Override
        public void handlePut(@NotNull final PersistentQueryInfoMessage value) {
          System.out.println("Received update for " + value.getConfig().getName() + ": " + value.getState().getStatus());
          synchronized (newStatusReference) {
            final String newPqName = value.getConfig().getName();
            if (!pqName.equals(newPqName)) {
              return;
            }

            final PersistentQueryStatusEnum observedStatus = value.getState().getStatus();
            if (isTerminalOrRunning(observedStatus)) {
              newStatusReference.set(observedStatus);
              newStatusReference.notify();
            }
          }
        }
      };

      controllerClient.subscribeToAll();

      // Get the PQ to running...
      final long maxSleepMillis = 60_000;
      try {
        synchronized (newStatusReference) {
          controllerClient.addObserver(observer);
          controllerClient.restartQuery(infoMessage.getConfig());
          long currentTime = System.currentTimeMillis();
          final long endTime = currentTime + maxSleepMillis;
          // We'll give it 60 seconds to get to running or fail
          while (currentTime < endTime && !isTerminalOrRunning(newStatusReference.get())) {
            final long sleepMillis = endTime - currentTime;
            newStatusReference.wait(sleepMillis);
            currentTime = System.currentTimeMillis();
          }
        }
      } catch(InterruptedException e){
        throw new RuntimeException("Interrupted while waiting for PQ to get running", e);
      } finally {
        controllerClient.removeObserver(observer);
      }

      final PersistentQueryStatusEnum latestStatus = newStatusReference.get();
      if (newStatusReference.get() != PQS_RUNNING) {
        throw new IllegalStateException("PQ did not get to running in " + maxSleepMillis + " milliseconds, state is " + latestStatus);
      }
    }

    private boolean isTerminalOrRunning(final PersistentQueryStatusEnum status) {
      return status != null && (status == PQS_RUNNING || controllerClient.isTerminal(status));
    }
  }

  public static void main(String[] args) {
    System.exit(new CommandLine(new FlightExample()).execute(args));
  }

  /**
   * Used by picocli to inject the inspected command line parameters collected.
   *
   * @return the command line exit code (i.e. shown by echo $?)
   */
  @Override
  public Integer call() {
    // We must have either a username and password, or a keyfile
    if (keyfile == null) {
      if (userName == null || password == null) {
        throw new IllegalArgumentException("Must have a keyfile, or a username and password");
      }
    } else if (userName != null || password != null) {
      throw new IllegalArgumentException("Must have a keyfile, or a username and password");
    }

    // This must be done very early in program initialization because it tells the program how and where to get its configuration
    HttpUrlPropertyInputStreamLoader.setServerUrl(url);

    // The session factory will make the connection to the specified Deephaven server and then authenticate.
    final CorePlusSessionFactory sessionFactory;
    try {
      sessionFactory = new CorePlusSessionFactory(url, userName, password, keyfile);
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
    sessionFactory.ensurePqRunning(pqName);

    // Ensure we release resources when done
    DndSessionFlight flightSession = null;
    FlightStream flightStream = null;
    try {
      flightSession = sessionFactory.getSessionFactory().persistentQuery(pqName);

      final PersistentQueryStatusEnum statusEnum = flightSession.getInfo().getState().getStatus();
      if (statusEnum != PQS_RUNNING) {
        throw new RuntimeException("Persistent query must be running");
      }

      final TableSpec testTable1 = DndSession.scopeTable(tableName);
      flightStream = flightSession.streamOf(testTable1);

      // Get the schema for the specified table
      System.out.println("Got stream for " + pqName + " - " + tableName + " from " + url);
      System.out.println(flightStream.getSchema());

      // The catalog shows all the tables available from this server
      System.out.println("System catalog");
      try (final FlightStream catalog = flightSession.streamOf(DndSession.catalogTable())) {
        while (catalog.next()) {
          final VectorSchemaRoot vectorSchemaRoot = catalog.getRoot();
          System.out.println(vectorSchemaRoot.contentToTSVString());
        }
      }

      // Use the session to perform an arbitrary operation. The PersistentQueryStateLog is an internal table showing
      // the history of all this user's persistent query executions. Fetch it using a pre-filtered table through the session.
      final TableSpec liveTableSpec = DndSession.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()", "LastEffectiveUser=`" + sessionFactory.getEffectiveUser() + "`");
      try (final FlightStream fromTableSpec = flightSession.streamOf(liveTableSpec)) {
        System.out.println("pqsl from TableSpec");
        while (fromTableSpec.next()) {
          final VectorSchemaRoot vectorSchemaRoot = fromTableSpec.getRoot();
          System.out.println(vectorSchemaRoot.contentToTSVString());
        }
      }

      // Do your Flight stuff here

    } catch (Exception e) {
      throw new RuntimeException(e);
    } finally {
      // We'll just ignore if exceptions happen while closing resources
      if (flightStream != null) {
        try {
          flightStream.close();
        } catch (Exception ignored) {
        }
      }

      if (flightSession != null) {
        try {
          flightSession.close();
        } catch (Exception ignored) {
        }
      }
    }

    return 0;
  }
}

Compile and run the code

  1. Compile the code to make sure you've got access to the required libraries.

  2. Use the Gradle file to compile the client.

./gradlew BarrageModule:compileJava
./gradlew FlightModule:compileJava
  1. Run the application.

Before running the application, ensure your PQ isn't running so you can see it start up and guarantee some data in the internal tables that the example retrieves.

./gradlew BarrageModule:run
./gradlew FlightModule:run