Communications Protocols
Deephaven components communicate using several protocols. Although each service may have different commands or functions, they generally fall into one of several types:
Protocol | Description |
---|---|
HTTPS | Protocol used by web browsers to request documents or post forms. |
gRPC | A binary protocol over HTTPS that allows unary or streaming request/responses using protobufs. |
Websocket | A communications layer that allows arbitrary data to be tunneled over HTTP. |
GWT-RPC | A serialization toolkit suitable for translating Java types for non-Java languages, such as JavaScript. |
Comm | Java Serialization over TCP. Deephaven considers this legacy technology, which is not exposed to Web Browsers or Core+ clients. |
Table Data Protocol (TDP) | Custom Binary over TCP for requesting and subscribing to Table Locations and Block Data. |
Tailer | Custom Binary over TCP for sending data or control commands to a Data Import Server. |
Arrow Flight | A standard gRPC protocol for static data interchange. |
Barrage | Deephaven's extension to Arrow Flight that provides for ticking data. |
This guide describes the most important Deephaven services and how they communicate.
Workers
The workers are at the heart of the Deephaven system and communicate with the configuration server, authentication server, Dispatcher, Controller, and Table Data Services like the TDCP. Historical data is read directly from shared storage (e.g., NFS). Clients then connect to the worker to request tables, initiate new operations, or perform other functions. Legacy Enterprise workers and Core+ workers use distinct client protocols. Rather than duplicate information for each service, the worker's interaction is described in the corresponding section.
Configuration server
All backend components use gRPC to communicate with the configuration server in order to retrieve property files. The configuration server also stores schema and routing configuration that is used by workers, data import servers, and tailers. The configuration server uses etcd as a storage backend. To write data, clients must authenticate. To validate the presented authentication tokens, the configuration server uses the authentication server's gRPC protocol. As every backend process connects to the configuration server, it is not included in the diagrams below.
Authentication server
The authentication server exposes a gRPC service and uses etcd to coordinate multiple redundant authentication servers. Clients connect to the authentication server, and using one of several methods prove their identity (e.g., using a user name and password or private key). Once a client has proved their identity, the server stores a "cookie" in etcd and the server provides the cookie to the client. The client must refresh the cookie periodically for it to remain valid.
The client can then request a token from the authentication server with a designated service. The server returns a token (random number, origin, and user context) to the client, which the client passes to the service that it would like to use (e.g., the Controller). The controller contacts an authentication server to verify the token. After verifying the token, the authentication server invalidates the token so it may not be reused. For subsequent requests, the client and server typically have another mechanism to ensure that a session remains valid (e.g., the duration of a TCP connection or another logical gRPC session).
Clients can connect to any authentication server (the gRPC channel abstracts which server the client is communicating with). Requests may not end up on the same server, so the etcd storage enables cookies to be shared.
For token verification, when another server issued the token, the request is forwarded to the issuing server. This situation is common, because the service is often not connected to the same authentication server as the client.
Similarly, public key authentication requests are forwarded to the server that generated the nonce. This situation occurs when the client load balancing policy sends messages to more than one authentication server.
Delegated authentication permits one client to pass its authentication onto another authentication client. This is useful to enable an authenticated process to bootstrap another process operating as the same user. For example, a remote query dispatcher client must bootstrap authentication for workers that it creates. In this example, only one authentication server is included for simplicity, but verification can proceed with two distinct authentication servers.
Controller
The Controller is responsible for managing Persistent Queries. Each Persistent Query has a configuration that is stored in etcd.
Controller clients connect to the controller over gRPC. After authenticating, the client can manipulate queries (e.g., adding, deleting, starting, stopping) or subscribe to a stream of query state.
The controller has a scheduling component that determines when each query should be started and stopped. When a query is started, the Controller connects to a Remote Query Dispatcher to allocate a worker for that query. The system has a single controller, but many remote query dispatchers. After the dispatcher starts the worker, the controller initiates a connection to the worker to initialize the Persistent Query (e.g., running the script code defined by the query configuration). The Controller uses gRPC to communicate with Core+ workers and Deephaven's Comm protocol to communicate with Legacy workers.
Workers also connect back to the controller over gRPC. They can publish updated variable information, request script code from the controller's configured git repositories, and subscribe to the Persistent Query state (just as other query clients).
The controller can be configured to periodically poll git repositories for updated code. The controller typically connects to git repositories using SSH with a private key for authentication.
As part of the Controller's leader election, the active controller writes its address to a key within etcd. A Deephaven gRPC NameResolver reads the value of this key from etcd
and uses that to establish the gRPC channel. Clients outside the Deephaven cluster may not contact etcd
. When the cluster is configured with Envoy, the configuration server provides the active controller's address to Envoy so that clients transparently connect to the active controller. When the cluster is configured without Envoy, clients use an alternative NameResolver that uses HTTP to retrieve the currently active controller from the web API server.
Dispatcher
The Remote Query Dispatcher presents a Comm interface to clients, which include the Controller, Web API server, and Swing Console. Core+ clients do not directly connect to the dispatcher. The Comm protocol allows clients to start and terminate workers; each worker's lifetime is bound to the TCP connection that initiated it. When a worker is started, the Dispatcher writes a key to etcd for that worker that contains its process info ID. The dispatcher separately communicates a nonce to the worker (by environment variable). The Dispatcher is responsible for process management of the worker. On Linux systems, this means that the dispatcher exec
s the process and reads from the stdin
and stdout
using Pipes. With Kubernetes, the Dispatcher accesses the Kubernetes API to start processes and monitor their output.
The workers watch the etcd key, which contains an HTTPS URL to contact the dispatcher and the worker's status. The dispatcher can signal the worker to shutdown by writing a "terminate" status to the key. When the key is removed, the worker begins a timer and terminates itself if the key is not restored within sixty seconds. The worker connects to the dispatcher, using the provided URL and POSTs its status. The first POST causes the worker to be registered, and the dispatcher provides "worker details" which control the worker's startup. Subsequent POST requests serve as keep alives. If the worker does not send keep alive requests within the configured TTL (by default 60 seconds), then the dispatcher terminates the worker.
The same web server that is used for registration and keep alive can optionally serve status pages listing the active workers and resource utilization.
Core+ Clients
Core+ clients communicate with the authentication server and controller over gRPC. Temporarily persistent queries are made through the controller to create workers. This eliminates the need for Core+ clients to communicate with the dispatchers over the Deephaven "Comm" protocol that relies on Java serialization, or with the Web API server, which uses a GWT-RPC serialization protocol. Communication with the worker is performed via gRPC messaging using the default Deephaven Community clients. The Core+ clients connect to the Web API server over HTTPS to retrieve an initial configuration JSON document (connection.json
).
Web IDE
The Web IDE connects to the Web API server using HTTP to retrieve static assets like images and the Javascript application code.
The Web IDE does not directly communicate with the authentication server or controller. Instead it makes a connection to the Web API server using a Websocket. The Web API server then manages gRPC connections to the authentication server and controller on the clients' behalf. Each client has a private authentication server session; the controller session is shared. The query monitor panel (outside of safe mode) is handled via the WebClientData query that has another controller connection. To create Code Studios, the Web IDE makes requests to the Web API server that are forwarded to a dispatcher.
The Web IDE connects directly to Core+ workers via HTTP to retrieve the Javascript API that is specific to that version of Community. The browser then connects to the the Core+ Worker using the Community gRPC API.
Legacy Clients
Swing Console
The Swing console is the prototypical example of a Legacy Java client. The first step to run a Java client is to ensure that the client's JARs and resources are synchronized with the server. The Launcher process (or configuration updater) is responsible for performing this synchronization. Using HTTPS, it downloads a getdown.txt
digest file from the Web API server. Using the digests, the launcher can determine which JARs and resources have been updated, and then downloads only those files from the Web API server. After downloading the required files, the launcher executes the Swing Console.
The Swing Console connects to most backend services directly, either using gRPC or the Deephaven "Comm" protocol that is based on Java serialization:
- The authentication server uses gRPC to authenticate using user name and password or another configured method (such as SAML or ActiveDirectory integration). When connecting to other services, a new token is requested.
- The Controller uses gRPC to subscribe to the list of Persistent Queries and also to add, remove, or modify them.
- To create Console sessions, the Swing console communicates directly with the dispatcher using Comm.
- To display tables, Comm is used to connect to Legacy workers. Core+ workers are not supported from the Swing Console.
- The ACL Editor uses a REST API to communicate with the ACL Write Server. The ACL Write Server does not provide an API to retrieve ACLs; a separate Legacy worker is used to read the ACLs.
Web IDE
The common components of the Web IDE are described in the Core+ section. For communication with workers, the Web IDE connects to the workers directly using a Websocket with a GWT serialized payload.
Data Import Server
The Data Import Server (DIS) accepts connections from tailers, persists the data to local storage, and serves it out over the Table Data Protocol. The DIS generates listener classes to interpret the data sent from the tailer (either as binary logs or CSV). The listener depends on the destination table's schema, which is retrieved from the configuration server. A tailer may replicate data to more than one DIS depending on the data routing configuration (also retrieved from the configuration server).
The DIS supports an optional web port to enumerate connected clients.
An import server can also run within the context of a worker. This is used for fast lastBy, Kafka, and Solace ingestion. For Kafka and Solace, the worker initiates the connection to the broker and may not expose a tailer port.
For improved performance, a merge process may read directly from the local storage and write to historical storage (e.g., NFS). The Merge Process would otherwise read block data via the TDP.
TDCP
The Table Data Cache Proxy (TDCP) runs on each node of the Deephaven system. Workers connect to the TDCP and then the TDCP connects to other live data sources (according to the data routing configuration). The TDCP accepts Table Data Protocol (TDP) connections from workers and forwards requests to other servers that export Table Data Services over TDP, typically the Data Import Server(s). Routing is controlled by YAML retrieved from the configuration server. Core+ and Legacy workers function identically with respect to the Table Data Protocol.
Envoy
As can be seen in the various client interactions, Deephaven has many components that by default listen on a variety of hosts and ports. Within the cluster, this many-to-many communication is essential. However, from a client program that may be executing on a desktop environment, it is often undesirable to require many open ports (e.g., due to security or ease of administration concerns). Deephaven supports the use of Envoy as a front-proxy to direct traffic from one HTTP port to the appropriate backend service.
Envoy can terminate HTTPS, gRPC and Websockets. Because Envoy does not terminate "Comm" communication, "Comm" connections are tunneled through web sockets.
For singleton services, the request's path prefix can be used to determine what backend host and port to send the request to. For dispatchers and workers, more complexity is required. For Dispatchers and Legacy workers, a per-worker prefix is used to direct a websocket request to the appropriate backend host and port. For Core+ workers, the same per-worker prefix is used for HTTPS requests (e.g., to request the Javascript API). gRPC does not support prefixes, therefore a custom envoy-prefix
HTTP header is used to route requests to the correct host and port.
To discover the configuration, Envoy uses an gRPC "xDS" protocol that is implemented by the configuration server.
Service | Protocol | Routing | Comments |
---|---|---|---|
Authentication | gRPC | - | |
Configuration | gRPC | - | |
Controller | gRPC | - | |
ACL Writer | REST | - | |
Web API | HTTPS, Websocket+GWT-RPC | - | |
Dispatcher | Websocket+Comm | Prefix | Used Only for Swing |
Legacy Worker | Websocket+Comm | Prefix | Used Only for Swing |
Legacy Worker | Websocket+GWT-RPC | Prefix | Used for Web UI |
Core+ Worker | HTTPS | Prefix | Used for Javascript Module |
Core+ Worker | gRPC | Header | Used for Table Operations and Data (Barrage) |
etcd
etcd is used as a shared, fault-tolerant backing store across the system. Communication with etcd uses gRPC.
Configuration (properties, schema, routing, and the service registry) are stored in etcd and access is mediated through the configuration server. The configuration tools generally have a "direct" mode for communicating with etcd directly for system installation, upgrade, and recovery.
The Persistent Query controller uses etcd to store the Persistent Query configurations as XML documents. All access to the Persistent Query store is mediated through the active controller. The running controllers automatically select the active controller with etcd leader election.
The Persistent Query controller also uses etcd to track Persistent Query internal state. When the controller creates workers or changes their state, it persists that state to etcd. When the controller restarts (or a new controller is elected leader), that state is used to reconnect to the running Core+ workers and terminate workers that cannot be restored.
The ACL Write Server usually stores ACLs in the etcd system (the other alternative being MySQL). Workers communicate directly with etcd to retrieve ACLs for table access. The dhconfig acl
tool provides a --direct
mode for accessing the ACL database without using the ACL write server. ACLs are accessed directly by processes that require group or table access information using a read-only user.
The Remote Query Dispatcher uses etcd for communicating liveness with workers. The dispatcher writes a per-worker key to the etcd store, and then the worker watches that key. The key informs the worker where the dispatcher is listening for HTTP requests for registration, and if it is removed then the worker will terminate after a timeout.
The Authentication Server stores cookies inside of etcd. The cookies are created when clients authenticate, and removed when they logout (either explicitly or implicitly by failing to renew their cookie). The Authentication Servers store the cookies in memory, but for redundancy, when an unknown cookie is received they check the etcd state. This allows a client to authenticate to any of the Authentication Servers and all of them to service the requests for that client. The Authentication Server can also use the etcd ACL store for username-password combinations or public key storage.
MySQL
In older versions of Deephaven, the default storage for ACLs is in a MySQL database. Current versions of Deephaven still support MySQL, but new installations default to etcd. The database is only ever written to by the ACL Write Server or the dhconfig acl
utility. The database can also contain username and password hashes or public keys for authentication. Processes that require group information or table ACLs directly connect to the MySQL database to retrieve ACLs using the standard MySQL JDBC driver on port 3306.