Core+ R Client
The Core+ R client allows you to create and connect to Persistent Queries using the Community Core engine in the Deephaven Enterprise system. This page provides a brief overview of using the R client.
Obtain the R client sources
The R client is distributed as a source code bundle. This bundle
includes a dockerized build script that can be used in any Linux
machine with Docker and Docker's buildx
installed (buildx
is the
Docker CLI plugin for extended build capabilities with BuildKit
).
The script can produce a binary package for supported platforms (at
this time, x86_64 for Ubuntu 22.04, Fedora 38, and RHEL 8, more to come).
The bundle also includes several code examples in the R/rdnd/etc
directory.
You can request the latest version of the source bundle from your Deephaven Enterprise representative.
Basic Usage
Start an R console and load the Core+ R client
Load the Core+ R client in an R console with library("rdnd")
.
$ R --no-save
R version 4.3.1 (2023-06-16) -- "Beagle Scouts"
Copyright (C) 2023 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)
[...]
> library("rdnd")
Loading required package: Rcpp
Loading required package: arrow
Attaching package: ‘arrow’
The following object is masked from ‘package:utils’:
timestamp
Loading required package: R6
>
The package has several dependencies that are also loaded,
including arrow
, the Apache Arrow R client, and rdeephaven
, the
Deephaven Community R client. The classes for rdeephaven
become available under the rdeephaven::
package prefix.
Note
If you also load the community package directly using library("rdeephaven")
you can avoid having to refer to its classes using the rdeephaven::
prefix.
Create a SessionManager
A SessionManager
R6 object allows a client to keep connections to
a Deephaven Enterprise Controller and Authentication services.
First, we need to create a SessionManager
, which creates the connections
and keep them alive for as long
as the SessionManager
object is valid.
base_dhe_url = "https://host.mydomain.com:8000/iris"
# sm is an R6 SessionManager object.
sm <- SessionManager$new(
descriptive_name=paste0("CorePlus session manager pid=", Sys.getpid()),
source=paste0(base_dhe_url, "/connection.json"))
The SessionManager
constructor takes two arguments.
- A descriptive name that will be used on the server and client sides to log information about operations performed by this client.
- A URL pointing to a file in JSON format containing information about connectivity
parameters for the Deephaven installation. Such a file is provided
by a Deephaven installation under
iris/connection.json
.
The descriptive name, in principle, can be any string of our
choosing. However, it is important to select a value that allows us to
distinguish our client in logs. It makes sense to include the process
id where the client was created; in the example above we are using the
R method Sys.getpid()
for that purpose.
Note
The client implementation will append information about the host where this client
is running, so there is no need include that in the provided descriptive_name
.
During the creation of the SessionManager
object, three network connections are made
in the order indicated below.
- The
connection.json
file is downloaded from the provided URL. This is a temporary connection that only lives until we get the file. - A connection is made to the Deephaven Enterprise Authentication service.
- A connection is made to the Deephaven Enterprise Controller service.
The connections to Authentication and Controller are made based on the
information downloaded from connection.json
. These two connections
are maintained for as long as the SessionManager
object is valid.
All these three operations need to succeed for SessionManager
to be
properly initialized or an error results. When experiencing
connectivity issues, or if the infomation contained in the
connection.json
file downloaded is wrong, you may see an error
similar to:
Error: std::string deephaven_enterprise::utility::GetUrl(const string&)@/opt/deephaven/src/iris/DhcInDhe/cpp-client/utility/src/net.cc:121: Timeout was reached
Note
See: See the appendix at the end of this document for more information about Deephaven client connectivity and some tools to debug network issues.
You can get more information about SessionManager by consulting the R
documentation: in an R session, after loading the rdnd
package
run ?SessionManager
.
You can get more information about Core+ clients in general and their connectivity in the Core+ Clients section of our Communications Protocols documentation.
Authenticate
Once a SessionManager
is created, we need to authenticate it.
We have two options: using password authentication, providing
a user and password, or using private key authentication, providing
a private key file.
// typeof(auth_result) = "logical"
if (use_password_authentication) {
// use password authentication
auth_result <- sm$password_authentication(user, password, operate_as)
} else {
// use private key authentication
auth_result <-
sm$private_key_authentication(private_key_filename)
}
Note
The operate_as
argument for the password_authentication
method
is only interesting to administrators of the system; under normal circumstances
you would pass this argument as the same value provided for user
.
An authenticated SessionManager
can be used to connect to an
existing Persistent Query (PQ) or to create new PQs. In either case, the result
of a connection is an R6 DndClient
object.
The DndClient
class
Objects of the R6 class DndClient
are created and returned by a SessionManager
when connecting to a PQ; either an already existing
PQ or one created for you at that point.
Using the returned DndClient
object you can send queries to the
PQ and get results from it.
The DndClient
R6 class provides a few methods to interact with a PQ that
are unique for a Core+ PQ. You can consult the documentation for DndClient
in an R console by running ?DndClient
. Aside from the Core+-exclusive methods,
the DndClient
R6 class inherits from the Deephaven Community R client Client
class, and implements all its methods as well. You can consult the
documentation for Client
in an R console by running ?rdeephaven::Client
.
Connect to an existing PQ
To connect to an existing PQ, we need to know either the string name
or the numeric serial of the PQ. The numerical serial of the PQ
is a 64-bit integer, therefore, an R type of integer
will not work
for us here.
We use a string with the string representation of the right
64-bit number.
pq_name = "my_favorite_pq"
# client1 is an R6 DndClient Object
client1 <- sm$connect_to_pq_by_name(pq_name)
pq_serial = "9876543210";
client2 <- sm$connect_to_pq_by_serial(pq_serial)
Create a new temporary PQ and connect to it
To create a temporary PQ, we first need to create a PQ configuration.
We can use a PqConfigBuilder
object for that.
This object comes with predefined defaults for most parameters.
We don't have detailed documentation for PqConfigBuilder
available from R
yet (we are working on that); currently the class is a C++ object directly mapped via
Rcpp and R6; that being the case, you can get a list of the available methods
by using the class name itself in the R console:
> PqConfigBuilder
C++ class 'PqConfigBuilder' <0x560dbed6cf60>
Constructors:
PqConfigBuilder(io::deephaven::proto::controller::PersistentQueryConfigMessage)
Fields: No public fields exposed by this class
Methods:
void add_admin_group(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
void add_env_var(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
void add_jvm_arg(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
void add_viewer_group(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
io::deephaven::proto::controller::PersistentQueryConfigMessage build()
void enable_auto_delete(int)
void set_buffer_pool_to_heap_ratio(double)
void set_configuration_type(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
void set_init_timeout_seconds(int)
void set_jvm_profile(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
void set_max_heap_size_gb(double)
void set_name(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
void set_owner(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
void set_script_code(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
void set_script_language(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
void set_server(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >)
All methods listed above are mapped to an R (R6) method of the same name;
they all take a string
as an argument, except for
set_max_heap_size
andset_buffer_pool_to_heap_ratio
, which takedouble
.set_init_timeout_seconds
, which takesinteger
for number of seconds.
Coming back to our example, once we get a PqConfigBuilder
we can
accept all defaults, but at a minimum, we have to set the desired heap
size for our worker in gigabytes, as there is no default for it.
new_pq_name = "my_new_pq"
# pq_config_builder is an R6 PqConfigBuilder object
pq_config_builder <- sm$make_temp_pq_config_builder(new_pq_name)
pq_config_builder.set_max_heap_size_gb(4);
Once we have a PqConfigBuilder
ready with the options we want, we can call its
build
method to obtain the configuration and pass it to the add_query_and_connect
method of SessionManager
.
client3 <- $sm$add_query_and_connect(pq_config_builder$build());
Send a query to the server and get the result
We can send script code to create a table on the server (PQ)
using the run_script
method of DndClient
.
Note
The code given to run_script
should match the script language that
was configured for the PQ when it was created. For the case where a
new PQ is being created, the set_script_language
method of
PqConfigBuilder
can be used to set the script language of the new PQ.
This method accepts either "groovy"
or "python"
as a string argument.
# "PT1S" is an ISO-8601 duration, it specifies a duration of 1 second.
# This creates a table in the server that will 'tick' adding a new row
# every second.
# The table will have a single column with a timestamp on it.
client3$run_script("from deephaven import time_table; my_table = time_table(\"PT1S\")")
A rdeephaven::TableHandle
R6 object represents a server table on our client
(TableHandle
is an R6 class that is part of the rdeephaven
package,
the Deephaven Community R client package, that's why we are qualifying its
name with rdeephaven::
in this description).
If we know a table by name on the server (PQ), we can
create a rdeephaven::TableHandle
object for it with the open_table
method
of DndClient
. We can pull the data from the server as a dataframe
using the as_data_frame
method of rdeephaven::TableHandle
.
# my_table is an R6 rdeephaven::TableHandle object
t1 <- client$open_table("my_table")
df <- t1$as_data_frame()
print(df)
Output:
Timestamp
1 2023-09-23 03:15:21
2 2023-09-23 03:15:22
3 2023-09-23 03:15:23
As the table in the server is ticking, the number of rows observed depends
on when the code above was run, compared to when the table was created
by the call to run_script
. More precisely, it depends on when the as_data_frame
method is called, which is the one pulling the table data from the server
to our client as a dataframe.
We can use the t1
object to generate query results against that table in
the server.
t2 = t1$update(c("A = ii", "B = `Hello-` + A"))
df2 = t2$as_data_frame()
print(df2)
Output:
Timestamp A B
1 2023-09-23 03:15:21 0 Hello-0
2 2023-09-23 03:15:22 1 Hello-1
3 2023-09-23 03:15:23 2 Hello-2
4 2023-09-23 03:15:24 3 Hello-3
5 2023-09-23 03:15:25 4 Hello-4
6 2023-09-23 03:15:26 5 Hello-5
7 2023-09-23 03:15:27 6 Hello-6
8 2023-09-23 03:15:28 7 Hello-7
9 2023-09-23 03:15:29 8 Hello-8
10 2023-09-23 03:15:30 9 Hello-9
11 2023-09-23 03:15:31 10 Hello-10
12 2023-09-23 03:15:32 11 Hello-11
Each new table is defined in the server and only brought to the client when we request it as a dataframe.
t3 = t2$where("5 <= A && A <= 10")
df3 = t3$as_data_frame()
print(df3)
Output:
Timestamp A B
1 2023-09-23 03:15:26 5 Hello-5
2 2023-09-23 03:15:27 6 Hello-6
3 2023-09-23 03:15:28 7 Hello-7
4 2023-09-23 03:15:29 8 Hello-8
5 2023-09-23 03:15:30 9 Hello-9
6 2023-09-23 03:15:31 10 Hello-10
With a rdeephaven::TableHandle
object we can perform
many different operations on a Deephaven table.
Tip
Find more about operations available by consulting its documentation
via ?rdeephaven::TableHandle
.
Send data from the client to the server
If we have a dataframe in our R session, we can create
a table on the server and get our table handle for it
using the import_table
method of DndClient
.
# Create an example dataframe with some data.
rows = 10
df4 <- data.frame(
# A time column
T = seq.POSIXt(as.POSIXct(Sys.Date()), as.POSIXct(Sys.Date() + 30), by = "1 sec")[rows],
# A boolean (logical) column
B = sample(c(TRUE, FALSE), rows, TRUE),
# An int (integer) column
I1 = sample(1:rows),
# Anoter int (integer) column
I2 = sample(1:rows)
)
print(df4)
Output:
T B I1 I2
1 2023-09-23 00:00:09 FALSE 7 7
2 2023-09-23 00:00:09 FALSE 8 4
3 2023-09-23 00:00:09 TRUE 6 1
4 2023-09-23 00:00:09 FALSE 9 6
5 2023-09-23 00:00:09 TRUE 5 8
6 2023-09-23 00:00:09 TRUE 10 2
7 2023-09-23 00:00:09 FALSE 2 10
8 2023-09-23 00:00:09 FALSE 1 3
9 2023-09-23 00:00:09 TRUE 4 9
10 2023-09-23 00:00:09 TRUE 3 5
The code below loads the dataframe to the server and obtains a handle for the resulting table.
t4 = client3$import_table(df4)
# With the table handle we can then perform additional operations
# on the server table.
t5 = t4$update("S = I1 + I2")$group_by("S")
df5 = t5$as_data_frame()
print(df5)
Output:
S T B I1 I2
1 14 1695427209 FALSE 7 7
2 12 1695427209, 1695427209, 1695427209 FALSE, TRUE, FALSE 8, 10, 2 4, 2, 10
3 7 1695427209 TRUE 6 1
4 15 1695427209 FALSE 9 6
5 13 1695427209, 1695427209 TRUE, TRUE 5, 4 8, 9
6 4 1695427209 FALSE 1 3
7 8 1695427209 TRUE 3 5
Close DndClient
and SessionManager
objects
Once you are done with a DndClient
or SessionManager
object,
call their respective close
method. This will ensure that
all network connections are closed and
all server side resources associated to the object are released.
client1$close()
client2$close()
client3$close()
sm$close()
Caution
Once the close
method is called on either of these objects,
any subsequent operation results in an error.
Ensuring these objects are closed promptly supports the best utilization of system resources.
Appendix
Connectivity and tools to debug network issues
Deephaven clients use gRPC, an open source RPC framework from Google, to implement communications with Deephaven servers. Part of the nature of how gRPC works implies treating network errors as potentially transient: network failures are based on a timeout model. This implies that the wrong host address or a server that is down does not immediately cause a connection attempt to fail; instead gRPC keeps trying to connect, in the assumption that a new host name registration may appear or a server may restart and make the expected port available.
Eventually, if the network issue is not resolved before a timeout window expires, the connection attempt is considered failed and an error results.
Important
The default timeout used by Deephaven in its clients is 2 minutes.
Treating network connectivity issues always as potentially transient failures is very useful in terms of making deployed infrastructure, client and servers resilient. From the point of view of interactive use however, it results in some opaqueness for users: if there is a network failure, trying to start a client may look like it just 'sits there' for two minutes before complaining.
Tip
It can help in debugging network and connectivity issues to set the
GRPC_VERBOSITY
environment variable before starting an R session. The possible values are info
and debug
(debug
provides more details,
while being more noisy/chatty).
Setting GRPC_VERBOSITY
prints details of
gRPC
connection establishment, including SSL authentication
and certificate validation to standard output.