Develop a Java Query
This section of the crash course covers Deephaven Enterprise's client APIs. This guide discusses the Java client, which allows you to:
- Create new workers
- Interact with tables and objects on the server
- Connect to existing Persistent Queries (PQs)
- Create PQs
- Run queries server-side
- And more!
Prerequisites
To access the examples, you will need to connect to Deephaven's Artifactory using an API key. Please ensure you have the necessary username (provided by your administrator) and API key to download the required files.
Add the following entries to your gradle.properties
file, which is located at ${HOME}/.gradle/gradle.properties
on a Linux-based system:
artifactoryUser=<your username>
artifactoryAPIKey=<your Artifactory key>
Alternatively, if you already have a development environment with access to the Deephaven code, you can use that instead of the provided Gradle files.
Setup
Persistent Query setup
This example demonstrates how to connect to a Persistent Query to retrieve data. First, log in to your Deephaven server's web UI, then use the Query Monitor to create a Persistent Query.
- On the Settings tab:
- Call the PQ
MyTestJavaClientPQ
. - Select
Live Query (Script)
for the type. - Select a DB Server of
AutoQuery
. - Use the
Core+
engine. - Give it a heap size of
1
GB.
-
On the Scheduling tab, select
Disabled
. -
On the Script tab, select a Runtime of
Groovy
. The script consists of one line to retrieve theProcessEventLog
table and view theLogEntry
column in the order it was written. This table contains events written out by Deephaven workers and can help diagnose issues.
pel=db.liveTable("DbInternal", "ProcessEventLog").where("Date=today()").sort("Timestamp").view("LogEntry")
- Save the PQ.
Create your project
This crash course assumes you're on a Linux-based operating system. While the Gradle files are compatible with Windows, the commands will differ.
The Java client is available in two packages: client-barrage
and client-flight
.
- The
client-barrage
package requires at least Java 11 and includes Deephaven's Barrage extension. This allows you to use the full power of the ticking Deephaven engine. - The
client-flight
package is a Java 8 compatible minimal client that lets you access static snapshots of Deephaven tables using the Arrow Flight protocol.
First, create a directory to hold the examples. The rest of the example assumes you're running from this directory. For example:
cd ~
mkdir javaClient
cd javaClient
Gradle setup
To use the Java client, you'll need either the Barrage or Flight Deephaven application JAR files. A convenient way to obtain them is through Gradle.
If you are already familiar with Gradle, you can follow the Gradle instructions to set it up in this directory.
Alternatively, you can download the example Gradle wrapper zip file, copy it to the directory you created earlier, and then unzip it in your example client directory.
cp ~/Downloads/download.zip .
unzip download.zip
Run the Gradle wrapper command to set up the environment.
./gradlew wrapper
Application build files
Two Gradle files are used to compile and run the examples. Based on their contents, Gradle automatically determines and downloads the required dependencies.
- One file is for the Barrage client, and one is for the Flight client.
- You must update the
dhcVersion
anddheVersion
variables within the Gradle files to reflect the version of Deephaven that you are connecting to. - The
application
andrun
sections are used with the self-contained Java examples you'll create soon.- The Barrage example relies on a class called
io.deephaven.enterprise.dnd.client.example.BarrageExample
. - The Flight example relies on a class called
io.deephaven.enterprise.dnd.client.example.FlightExample
.
- The Barrage example relies on a class called
To use these examples with the suggested directories, edit settings.gradle
in your project directory. Depending on whether you want to use Barrage or Flight, add one or both of these lines:
include 'FlightModule'
include 'BarrageModule'
Then follow these instructions for Barrage, Flight, or both.
Gradle build details.
Create a directory and create a new build.gradle
file in it.
mkdir BarrageModule
vi BarrageModule/build.gradle
Add the following contents to the file. When adding the text make a couple of changes.
- Update
dhcVersion
anddheVersion
to the versions on your server. If you start your test PQ, you can see these in the Summary tab. - Update
/path/to/keyfile
with the path to your keyfile. - Alternatively, you can use a username and password for testing, although this is insecure.
plugins {
id 'application'
}
def dhcVersion="0.37.4"
def dheVersion="1.20240517.344"
// Barrage requires at least Java 11 but we'll use 17
sourceCompatibility="17"
targetCompatibility="17"
repositories {
mavenCentral()
maven {
credentials {
username = artifactoryUser ?: System.getProperty('user.name')
password = artifactoryAPIKey ?: ''
}
url "https://illumon.jfrog.io/illumon/libs-customer"
}
}
dependencies {
runtimeOnly "io.deephaven:deephaven-log-to-slf4j:${dhcVersion}"
implementation "info.picocli:picocli:4.6.1"
implementation "iris:client-barrage:$dheVersion"
implementation "iris:client-base:$dheVersion"
implementation "iris:AuthLib:$dheVersion"
implementation "iris:ControllerClientGrpc:$dheVersion"
}
application {
mainClassName = 'io.deephaven.enterprise.dnd.client.example.BarrageExample'
applicationDefaultJvmArgs = ["-DDeephavenEnterprise.rootFile=iris-common.prop",
"--add-opens=java.base/java.nio=ALL-UNNAMED"]
}
tasks.named('run', JavaExec).configure {
args('--url', 'https://server.domain.com:8000', '--keyfile', '/path/to/keyfile', '--pqname', 'MyTestJavaClientPQ', '--tablename', 'pel')
// args('--url', 'https://server.domain.com:8000', '--username', 'username', '--password', 'password', '--pqname', 'MyTestJavaClientPQ', '--tablename', 'pel')
}
Note
This example uses port 8000
. Your server may use a different port. Typically, the port is 8123
for servers without Envoy and 8000
for servers with Envoy.
Create a directory and create a new build.gradle
file in it.
mkdir FlightModule
vi FlightModule/build.gradle
Add the following contents to the file. When adding the text make a couple of changes.
- Update
dheVersion
to the version on your server. You can see this by clicking on the User Settings button in the web UI. - Update
/path/to/keyfile
with the path to your keyfile. - Alternatively, you can use a username and password for testing, although this is insecure.
plugins {
id 'application'
}
def dheVersion="1.20240517.344"
sourceCompatibility="17"
targetCompatibility="17"
repositories {
mavenCentral()
maven {
credentials {
username = artifactoryUser ?: System.getProperty('user.name')
password = artifactoryAPIKey ?: ''
}
url "https://illumon.jfrog.io/illumon/libs-customer"
}
}
dependencies {
implementation "info.picocli:picocli:4.6.1"
implementation "iris:client-flight:$dheVersion"
implementation "iris:client-base:$dheVersion"
implementation "iris:AuthLib:$dheVersion"
implementation "iris:ControllerClientGrpc:$dheVersion"
}
application {
mainClassName = 'io.deephaven.enterprise.dnd.client.example.FlightExample'
applicationDefaultJvmArgs = ["-DDeephavenEnterprise.rootFile=iris-common.prop"]
}
tasks.named('run', JavaExec).configure {
args('--url', 'https://server.domain.com:8000', '--keyfile', '/path/to/keyfile', '--pqname', 'MyTestJavaClientPQ', '--tablename', 'pel')
// args('--url', 'https://server.domain.com:8000', '--username', 'username', '--password', 'password', '--pqname', 'MyTestJavaClientPQ', '--tablename', 'pel')
}
Note
This example uses port 8000
. Your server may use a different port. Typically, the port is 8123
for servers without Envoy and 8000
for servers with Envoy.
Update the Gradle file's args
line (at the bottom of the file) with an appropriate URL, username, password, Persistent Query name, and table name. The PQ name should be the one created earlier (MyTestJavaClientPQ
), and the table name should be the one from that PQ's script (pel
).
Alternatively, you can use a keyfile that's been set up instead of the username and password. This is the recommended authentication method.
Create the Java client code
Depending on whether you're creating a Barrage or a Flight application, follow the appropriate instructions below to create the Java application. The code contains comments explaining what it's doing.
Java client code
- Create the Barrage Java client code.
mkdir -p BarrageModule/src/main/java/io/deephaven/enterprise/dnd/client/example
vi BarrageModule/src/main/java/io/deephaven/enterprise/dnd/client/example/BarrageExample.java
- Copy the following code into the file.
package io.deephaven.enterprise.dnd.client.example;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.util.TableTools;
import io.deephaven.enterprise.auth.UserContext;
import io.deephaven.enterprise.config.HttpUrlPropertyInputStreamLoader;
import io.deephaven.enterprise.controller.client.ControllerClientGrpc;
import io.deephaven.enterprise.dnd.client.DeephavenClusterConnection;
import io.deephaven.enterprise.dnd.client.DndSession;
import io.deephaven.enterprise.dnd.client.DndSessionBarrage;
import io.deephaven.enterprise.dnd.client.DndSessionFactoryBarrage;
import io.deephaven.proto.controller.PersistentQueryInfoMessage;
import io.deephaven.proto.controller.PersistentQueryStatusEnum;
import io.deephaven.qst.table.TableSpec;
import org.jetbrains.annotations.NotNull;
import picocli.CommandLine;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import static io.deephaven.proto.controller.PersistentQueryStatusEnum.PQS_RUNNING;
/**
* Simple Barrage core+ example client class.
*/
public class BarrageExample implements Callable<Integer> {
@SuppressWarnings("unused")
@CommandLine.Option(names={"-u", "--url"}, description = "The server's URL", required = true)
private String url;
@SuppressWarnings("unused")
@CommandLine.Option(names={"-n", "--username"}, description = "A user name for authentication")
private String userName;
@SuppressWarnings("unused")
@CommandLine.Option(names={"-p", "--password"}, description = "A password for authentication")
private String password;
@SuppressWarnings("unused")
@CommandLine.Option(names={"-k", "--keyfile"}, description = "A keyfile for authentication")
private String keyfile;
@SuppressWarnings("unused")
@CommandLine.Option(names={"-pq", "--pqname"}, description = "A PQ name for the connection", required = true)
private String pqName;
@SuppressWarnings("unused")
@CommandLine.Option(names={"-t", "--tablename"}, description = "A table name to retrieve", required = true)
private String tableName;
/**
* Container for a community session factory. The created controllerClient can be used to perform programmatic PQ management,
* and is used here to start the PQ if needed.
* <br>
* The first and most important step when using the Java client is to create a session. A session is a connection to the
* Deephaven server that allows authentication with the server, table creation, persistent query access, and more. The session
* factory creates a session. It requires one of the following two things to connect to the server:
*
* <ul>
* <li>A URL to a connection.json file, which is the method used in the example</li>
* <li>A JSON object with the connection details</li>
* </ul>
*
* All Deephaven servers expose a connection.json file, which can be seen on {@code <server URL>/iris/connection.json}.
* You'll see the example Gradle files use a server URL, which the application uses to retrieve this. The application
* uses it to create its {@link DndSessionFactoryBarrage} and {@link DeephavenClusterConnection} instances.
*/
private static class CorePlusSessionFactory {
final DndSessionFactoryBarrage sessionFactory;
final ControllerClientGrpc controllerClient;
final UserContext userContext;
/**
* Create and authenticate the session factory.
*
* @param serverUrl the server's URL, like {@code https://server.domain.com:8000} or {@code https://server.domain.com:8123}
* @param userName a username
* @param password a password
* @param keyFile an authentication keyfile
* @throws IOException from the underlying calls
*/
private CorePlusSessionFactory(final String serverUrl, final String userName, final String password, final String keyFile) throws IOException {
System.out.println("Connecting to " + serverUrl);
final String connectionUrl = serverUrl + "/iris/connection.json";
sessionFactory = new DndSessionFactoryBarrage(connectionUrl);
final DeephavenClusterConnection clusterConnection = new DeephavenClusterConnection(connectionUrl);
if (keyFile != null) {
sessionFactory.privateKey(keyFile);
clusterConnection.privateKey(keyFile);
} else {
sessionFactory.password(userName, password);
clusterConnection.password(userName, password);
}
userContext = clusterConnection.getUserContext();
controllerClient = clusterConnection.getControllerClient();
}
/**
* Return the initialized {@link CorePlusSessionFactory}
*
* @return the {@link CorePlusSessionFactory}
*/
public DndSessionFactoryBarrage getSessionFactory() {
return sessionFactory;
}
/**
* Get the user that this session is operating as.
*
* @return the user that this session is operating as
*/
public String getEffectiveUser() {
return userContext.getEffectiveUser();
}
/**
* Ensure that a PQ is running.
*
* @param pqName the PQ name
*/
public void ensurePqRunning(final String pqName) {
final PersistentQueryInfoMessage infoMessage = sessionFactory.getPQ(pqName);
if (infoMessage == null) {
throw new IllegalArgumentException("Persistent query " + pqName + " does not exist");
}
final PersistentQueryStatusEnum pqStatus = infoMessage.getState().getStatus();
if (pqStatus == PQS_RUNNING) {
return;
}
final AtomicReference<PersistentQueryStatusEnum> newStatusReference = new AtomicReference<>(null);
final ControllerClientGrpc.ObserverImpl observer = new ControllerClientGrpc.ObserverImpl() {
@Override
public void handlePut(@NotNull final PersistentQueryInfoMessage value) {
System.out.println("Received update for " + value.getConfig().getName() + ": " + value.getState().getStatus());
synchronized (newStatusReference) {
final String newPqName = value.getConfig().getName();
if (!pqName.equals(newPqName)) {
return;
}
final PersistentQueryStatusEnum observedStatus = value.getState().getStatus();
if (isTerminalOrRunning(observedStatus)) {
newStatusReference.set(observedStatus);
newStatusReference.notify();
}
}
}
};
controllerClient.subscribeToAll();
// Get the PQ to running...
final long maxSleepMillis = 60_000;
try {
synchronized (newStatusReference) {
controllerClient.addObserver(observer);
controllerClient.restartQuery(infoMessage.getConfig());
long currentTime = System.currentTimeMillis();
final long endTime = currentTime + maxSleepMillis;
// We'll give it 60 seconds to get to running or fail
while (currentTime < endTime && !isTerminalOrRunning(newStatusReference.get())) {
final long sleepMillis = endTime - currentTime;
newStatusReference.wait(sleepMillis);
currentTime = System.currentTimeMillis();
}
}
} catch(InterruptedException e){
throw new RuntimeException("Interrupted while waiting for PQ to get running", e);
} finally {
controllerClient.removeObserver(observer);
}
final PersistentQueryStatusEnum latestStatus = newStatusReference.get();
if (newStatusReference.get() != PQS_RUNNING) {
throw new IllegalStateException("PQ did not get to running in " + maxSleepMillis + " milliseconds, state is " + latestStatus);
}
}
private boolean isTerminalOrRunning(final PersistentQueryStatusEnum status) {
return status != null && (status == PQS_RUNNING || controllerClient.isTerminal(status));
}
}
public static void main(String[] args) {
System.exit(new CommandLine(new BarrageExample()).execute(args));
}
/**
* Used by picocli to inject the inspected command line parameters collected.
*
* @return the command line exit code (i.e. shown by echo $?)
*/
@Override
public Integer call() {
// We must have either a username and password, or a keyfile
if (keyfile == null) {
if (userName == null || password == null) {
throw new IllegalArgumentException("Must have a keyfile, or a username and password");
}
} else if (userName != null || password != null) {
throw new IllegalArgumentException("Must have a keyfile, or a username and password");
}
// This must be done very early in program initialization because it tells the program how and where to get its configuration
HttpUrlPropertyInputStreamLoader.setServerUrl(url);
// The session factory will make the connection to the specified Deephaven server and then authenticate.
final CorePlusSessionFactory sessionFactory;
try {
sessionFactory = new CorePlusSessionFactory(url, userName, password, keyfile);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
sessionFactory.ensurePqRunning(pqName);
DndSessionBarrage barrageSession = null;
Table table = null;
// Ensure we release resources when done - this is better than manually closing
try {
barrageSession = sessionFactory.getSessionFactory().persistentQuery(pqName);
final TableSpec testTable1 = DndSession.scopeTable(tableName);
table = barrageSession.subscribeTo(testTable1);
System.out.println("Subscribed to " + pqName + " - " + tableName + " from " + url);
System.out.println("Size is " + table.size());
// The catalog shows all the tables available from this server
System.out.println("System catalog");
final Table catalog = barrageSession.snapshotOf(DndSession.catalogTable());
TableTools.show(catalog);
// Use the session to perform an arbitrary operation. The PersistentQueryStateLog is an internal table showing
// the history of all this user's persistent query executions.
barrageSession.executeCode(
"pqsl = db.liveTable(namespace=\"DbInternal\", table_name=\"PersistentQueryStateLog\").where(\"Date=today()\").tail(2)");
final Table fromExCode = barrageSession.snapshotOf(DndSession.scopeTable("pqsl"));
System.out.println("pqsl from executeCode");
TableTools.show(fromExCode);
// Fetch the PersistentQueryStateLog using a pre-filtered table through the session
final TableSpec liveTableSpec = DndSession.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()", "LastEffectiveUser=`" + sessionFactory.getEffectiveUser() + "`");
final Table fromTableSpec = barrageSession.subscribeTo(liveTableSpec);
System.out.println("pqsl from TableSpec");
TableTools.show(fromTableSpec);
// Do your Barrage stuff here
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (table != null) {
table.close();
}
if (barrageSession != null) {
try {
barrageSession.close();
} catch (IOException ignored) {
}
}
}
return 0;
}
}
- Create the Flight Java client code.
mkdir -p FlightModule/src/main/java/io/deephaven/enterprise/dnd/client/example
vi FlightModule/src/main/java/io/deephaven/enterprise/dnd/client/example/FlightExample.java
- Copy the following code into the file.
package io.deephaven.enterprise.dnd.client.example;
import io.deephaven.enterprise.auth.UserContext;
import io.deephaven.enterprise.config.HttpUrlPropertyInputStreamLoader;
import io.deephaven.enterprise.controller.client.ControllerClientGrpc;
import io.deephaven.enterprise.dnd.client.DeephavenClusterConnection;
import io.deephaven.enterprise.dnd.client.DndSession;
import io.deephaven.enterprise.dnd.client.DndSessionFactoryFlight;
import io.deephaven.enterprise.dnd.client.DndSessionFlight;
import io.deephaven.proto.controller.PersistentQueryInfoMessage;
import io.deephaven.proto.controller.PersistentQueryStatusEnum;
import io.deephaven.qst.table.TableSpec;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.jetbrains.annotations.NotNull;
import picocli.CommandLine;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import static io.deephaven.proto.controller.PersistentQueryStatusEnum.PQS_RUNNING;
/**
* Simple Barrage core+ example client class.
*/
public class FlightExample implements Callable<Integer> {
@SuppressWarnings("unused")
@CommandLine.Option(names={"-u", "--url"}, description = "The server's URL", required = true)
private String url;
@SuppressWarnings("unused")
@CommandLine.Option(names={"-n", "--username"}, description = "A user name for authentication")
private String userName;
@SuppressWarnings("unused")
@CommandLine.Option(names={"-p", "--password"}, description = "A password for authentication")
private String password;
@SuppressWarnings("unused")
@CommandLine.Option(names={"-k", "--keyfile"}, description = "A keyfile for authentication")
private String keyfile;
@SuppressWarnings("unused")
@CommandLine.Option(names={"-pq", "--pqname"}, description = "A PQ name for the connection", required = true)
private String pqName;
@SuppressWarnings("unused")
@CommandLine.Option(names={"-t", "--tablename"}, description = "A table name to retrieve", required = true)
private String tableName;
/**
* Container for a community session factory. The created controllerClient can be used to perform programmatic PQ management,
* and is used here to start the PQ if needed.
* <br>
* The first step when using the Java client is to create a session. A session is a connection to the
* Deephaven server that allows authentication with the server, table creation, persistent query access, and more. The session
* factory can then be used to create a session. It requires one of the following two things to connect to the server:
*
* <ul>
* <li>A URL to a connection.json file, which is the method used in the example</li>
* <li>A JSON object with the connection details</li>
* </ul>
*
* All Deephaven servers expose a connection.json file, which can be seen on {@code <server URL>/iris/connection.json}.
* You'll see the example Gradle files use a server URL, which the application uses to retrieve this. The application
* uses it to create its {@link DndSessionFactoryFlight} and {@link DeephavenClusterConnection} instances.
*/
private static class CorePlusSessionFactory {
final DndSessionFactoryFlight sessionFactory;
final ControllerClientGrpc controllerClient;
final UserContext userContext;
/**
* Create and authenticate the session factory.
*
* @param serverUrl the server's URL, like {@code https://server.domain.com:8000} or {@code https://server.domain.com:8123}
* @param userName a username
* @param password a password
* @param keyFile an authentication keyfile
* @throws IOException from the underlying calls
*/
private CorePlusSessionFactory(final String serverUrl, final String userName, final String password, final String keyFile) throws IOException {
System.out.println("Connecting to " + serverUrl);
final String connectionUrl = serverUrl + "/iris/connection.json";
sessionFactory = new DndSessionFactoryFlight(connectionUrl);
final DeephavenClusterConnection clusterConnection = new DeephavenClusterConnection(connectionUrl);
if (keyFile != null) {
sessionFactory.privateKey(keyFile);
clusterConnection.privateKey(keyFile);
} else {
sessionFactory.password(userName, password);
clusterConnection.password(userName, password);
}
userContext = clusterConnection.getUserContext();
controllerClient = clusterConnection.getControllerClient();
}
/**
* Return the initialized {@link CorePlusSessionFactory}
*
* @return the {@link CorePlusSessionFactory}
*/
public DndSessionFactoryFlight getSessionFactory() {
return sessionFactory;
}
/**
* Get the user that this session is operating as.
*
* @return the user that this session is operating as
*/
public String getEffectiveUser() {
return userContext.getEffectiveUser();
}
/**
* Ensure that a PQ is running.
*
* @param pqName the PQ name
*/
public void ensurePqRunning(final String pqName) {
final PersistentQueryInfoMessage infoMessage = sessionFactory.getPQ(pqName);
if (infoMessage == null) {
throw new IllegalArgumentException("Persistent query " + pqName + " does not exist");
}
final PersistentQueryStatusEnum pqStatus = infoMessage.getState().getStatus();
if (pqStatus == PQS_RUNNING) {
return;
}
final AtomicReference<PersistentQueryStatusEnum> newStatusReference = new AtomicReference<>(null);
final ControllerClientGrpc.ObserverImpl observer = new ControllerClientGrpc.ObserverImpl() {
@Override
public void handlePut(@NotNull final PersistentQueryInfoMessage value) {
System.out.println("Received update for " + value.getConfig().getName() + ": " + value.getState().getStatus());
synchronized (newStatusReference) {
final String newPqName = value.getConfig().getName();
if (!pqName.equals(newPqName)) {
return;
}
final PersistentQueryStatusEnum observedStatus = value.getState().getStatus();
if (isTerminalOrRunning(observedStatus)) {
newStatusReference.set(observedStatus);
newStatusReference.notify();
}
}
}
};
controllerClient.subscribeToAll();
// Get the PQ to running...
final long maxSleepMillis = 60_000;
try {
synchronized (newStatusReference) {
controllerClient.addObserver(observer);
controllerClient.restartQuery(infoMessage.getConfig());
long currentTime = System.currentTimeMillis();
final long endTime = currentTime + maxSleepMillis;
// We'll give it 60 seconds to get to running or fail
while (currentTime < endTime && !isTerminalOrRunning(newStatusReference.get())) {
final long sleepMillis = endTime - currentTime;
newStatusReference.wait(sleepMillis);
currentTime = System.currentTimeMillis();
}
}
} catch(InterruptedException e){
throw new RuntimeException("Interrupted while waiting for PQ to get running", e);
} finally {
controllerClient.removeObserver(observer);
}
final PersistentQueryStatusEnum latestStatus = newStatusReference.get();
if (newStatusReference.get() != PQS_RUNNING) {
throw new IllegalStateException("PQ did not get to running in " + maxSleepMillis + " milliseconds, state is " + latestStatus);
}
}
private boolean isTerminalOrRunning(final PersistentQueryStatusEnum status) {
return status != null && (status == PQS_RUNNING || controllerClient.isTerminal(status));
}
}
public static void main(String[] args) {
System.exit(new CommandLine(new FlightExample()).execute(args));
}
/**
* Used by picocli to inject the inspected command line parameters collected.
*
* @return the command line exit code (i.e. shown by echo $?)
*/
@Override
public Integer call() {
// We must have either a username and password, or a keyfile
if (keyfile == null) {
if (userName == null || password == null) {
throw new IllegalArgumentException("Must have a keyfile, or a username and password");
}
} else if (userName != null || password != null) {
throw new IllegalArgumentException("Must have a keyfile, or a username and password");
}
// This must be done very early in program initialization because it tells the program how and where to get its configuration
HttpUrlPropertyInputStreamLoader.setServerUrl(url);
// The session factory will make the connection to the specified Deephaven server and then authenticate.
final CorePlusSessionFactory sessionFactory;
try {
sessionFactory = new CorePlusSessionFactory(url, userName, password, keyfile);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
sessionFactory.ensurePqRunning(pqName);
// Ensure we release resources when done
DndSessionFlight flightSession = null;
FlightStream flightStream = null;
try {
flightSession = sessionFactory.getSessionFactory().persistentQuery(pqName);
final PersistentQueryStatusEnum statusEnum = flightSession.getInfo().getState().getStatus();
if (statusEnum != PQS_RUNNING) {
throw new RuntimeException("Persistent query must be running");
}
final TableSpec testTable1 = DndSession.scopeTable(tableName);
flightStream = flightSession.streamOf(testTable1);
// Get the schema for the specified table
System.out.println("Got stream for " + pqName + " - " + tableName + " from " + url);
System.out.println(flightStream.getSchema());
// The catalog shows all the tables available from this server
System.out.println("System catalog");
try (final FlightStream catalog = flightSession.streamOf(DndSession.catalogTable())) {
while (catalog.next()) {
final VectorSchemaRoot vectorSchemaRoot = catalog.getRoot();
System.out.println(vectorSchemaRoot.contentToTSVString());
}
}
// Use the session to perform an arbitrary operation. The PersistentQueryStateLog is an internal table showing
// the history of all this user's persistent query executions. Fetch it using a pre-filtered table through the session.
final TableSpec liveTableSpec = DndSession.liveTable("DbInternal", "PersistentQueryStateLog").where("Date=today()", "LastEffectiveUser=`" + sessionFactory.getEffectiveUser() + "`");
try (final FlightStream fromTableSpec = flightSession.streamOf(liveTableSpec)) {
System.out.println("pqsl from TableSpec");
while (fromTableSpec.next()) {
final VectorSchemaRoot vectorSchemaRoot = fromTableSpec.getRoot();
System.out.println(vectorSchemaRoot.contentToTSVString());
}
}
// Do your Flight stuff here
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// We'll just ignore if exceptions happen while closing resources
if (flightStream != null) {
try {
flightStream.close();
} catch (Exception ignored) {
}
}
if (flightSession != null) {
try {
flightSession.close();
} catch (Exception ignored) {
}
}
}
return 0;
}
}
Compile and run the code
-
Compile the code to make sure you've got access to the required libraries.
-
Use the Gradle file to compile the client.
./gradlew BarrageModule:compileJava
./gradlew FlightModule:compileJava
- Run the application.
Before running the application, ensure your PQ isn't running so you can see it start up and guarantee some data in the internal tables that the example retrieves.
./gradlew BarrageModule:run
./gradlew FlightModule:run