RPC Interface
The Barrage extension works by sending additional metadata via the app_metadata
field on FlightData
. This metadata
is used to communicate the necessary additional information between server and client. These types are flatbuffers, so
that we may more easily lift the app_metadata
into the RecordBatch
flatbuffer once Arrow supports byte-array
metadata, at that layer.
The main subscription mechanism is initiated via a DoExchange
. The client sends a SubscriptionRequest (or as many as
they like) and the server sends barrage updates to satisfy their subscription's requirements.
Flat Buffer Definitions
// Copyright 2020 Deephaven Data Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace io.deephaven.barrage.flatbuf;
enum BarrageMessageType : byte {
/// A barrage message wrapper might send a None message type
/// if the msg_payload is empty.
None = 0,
/// for session management (not-yet-used)
NewSessionRequest = 1,
RefreshSessionRequest = 2,
SessionInfoResponse = 3,
/// for subscription parsing/management (aka DoPut, DoExchange)
BarrageSerializationOptions = 4,
BarrageSubscriptionRequest = 5,
BarrageUpdateMetadata = 6,
BarrageSnapshotRequest = 7,
BarragePublicationRequest = 8,
// enum values greater than 127 are reserved for custom client use
}
/// The message wrapper used for all barrage app_metadata fields.
table BarrageMessageWrapper {
/// Used to identify this type of app_metadata vs other applications.
/// The magic value is '0x6E687064'. It is the numerical representation of the ASCII "dphn".
magic: uint;
/// The msg type being sent.
msg_type: BarrageMessageType;
/// The msg payload.
msg_payload: [byte];
}
/// Establish a new session.
table NewSessionRequest {
/// A nested protocol version (gets delegated to handshake)
protocol_version: uint;
/// Arbitrary auth/handshake info.
payload: [byte];
}
/// Refresh the provided session.
table RefreshSessionRequest {
/// this session token is only required if it is the first request of a handshake rpc stream
session: [byte];
}
/// Information about the current session state.
table SessionInfoResponse {
/// this is the metadata header to identify this session with future requests; it must be lower-case and remain static for the life of the session
metadata_header: [byte];
/// this is the session_token; note that it may rotate
session_token: [byte];
/// a suggested time for the user to refresh the session if they do not do so earlier; value is denoted in milliseconds since epoch
token_refresh_deadline_ms: long;
}
/// There will always be types that cannot be easily supported over IPC. These are the options:
/// Stringify (default) - Pretend the column is a string when sending over Arrow Flight (default)
/// JavaSerialization - Use java serialization; the client is responsible for the deserialization
/// ThrowError - Refuse to send the column and throw an internal error sharing as much detail as possible
enum ColumnConversionMode : byte { Stringify = 1, JavaSerialization, ThrowError }
table BarrageSubscriptionOptions {
/// see enum for details
column_conversion_mode: ColumnConversionMode = Stringify;
/// Deephaven reserves a value in the range of primitives as a custom NULL value. This enables more efficient transmission
/// by eliminating the additional complexity of the validity buffer.
use_deephaven_nulls: bool;
/// Explicitly set the update interval for this subscription. Note that subscriptions with different update intervals
/// cannot share intermediary state with other subscriptions and greatly increases the footprint of the non-conforming subscription.
///
/// Note: if not supplied (default of zero) then the server uses a consistent value to be efficient and fair to all clients
min_update_interval_ms: int;
/// Specify a preferred batch size. Server is allowed to be configured to restrict possible values. Too small of a
/// batch size may be dominated with header costs as each batch is wrapped into a separate RecordBatch. Too large of
/// a payload and it may not fit within the maximum payload size. A good default might be 4096.
batch_size: int;
/// Specify a maximum allowed message size. Server will enforce this limit by reducing batch size (to a lower limit
/// of one row per batch). If the message size limit cannot be met due to large row sizes, the server will throw a
/// `Status.RESOURCE_EXHAUSTED` exception
max_message_size: int;
}
/// Describes the subscription the client would like to acquire.
table BarrageSubscriptionRequest {
/// Ticket for the source data set.
ticket: [byte];
/// The bitset of columns to subscribe. If not provided then all columns are subscribed.
columns: [byte];
/// This is an encoded and compressed RowSet in position-space to subscribe to. If not provided then the entire
/// table is requested.
viewport: [byte];
/// Options to configure your subscription.
subscription_options: BarrageSubscriptionOptions;
/// When this is set the viewport RowSet will be inverted against the length of the table. That is to say
/// every position value is converted from `i` to `n - i - 1` if the table has `n` rows.
reverse_viewport: bool;
}
table BarrageSnapshotOptions {
/// see enum for details
column_conversion_mode: ColumnConversionMode = Stringify;
/// Deephaven reserves a value in the range of primitives as a custom NULL value. This enables more efficient transmission
/// by eliminating the additional complexity of the validity buffer.
use_deephaven_nulls: bool;
/// Specify a preferred batch size. Server is allowed to be configured to restrict possible values. Too small of a
/// batch size may be dominated with header costs as each batch is wrapped into a separate RecordBatch. Too large of
/// a payload and it may not fit within the maximum payload size. A good default might be 4096.
batch_size: int;
/// Specify a maximum allowed message size. Server will enforce this limit by reducing batch size (to a lower limit
/// of one row per batch). If the message size limit cannot be met due to large row sizes, the server will throw a
/// `Status.RESOURCE_EXHAUSTED` exception
max_message_size: int;
}
/// Describes the snapshot the client would like to acquire.
table BarrageSnapshotRequest {
/// Ticket for the source data set.
ticket: [byte];
/// The bitset of columns to request. If not provided then all columns are requested.
columns: [byte];
/// This is an encoded and compressed RowSet in position-space to subscribe to. If not provided then the entire
/// table is requested.
viewport: [byte];
/// Options to configure your subscription.
snapshot_options: BarrageSnapshotOptions;
/// When this is set the viewport RowSet will be inverted against the length of the table. That is to say
/// every position value is converted from `i` to `n - i - 1` if the table has `n` rows.
reverse_viewport: bool;
}
table BarragePublicationOptions {
/// Deephaven reserves a value in the range of primitives as a custom NULL value. This enables more efficient transmission
/// by eliminating the additional complexity of the validity buffer.
use_deephaven_nulls: bool;
}
/// Describes the table update stream the client would like to push to. This is similar to a DoPut but the client
/// will send BarrageUpdateMetadata to explicitly describe the row key space. The updates sent adhere to the table
/// update model semantics; thus BarragePublication enables the client to upload a ticking table.
table BarragePublicationRequest {
/// The destination Ticket.
ticket: [byte];
/// Options to configure your request.
publish_options: BarragePublicationOptions;
}
/// Holds all of the rowset data structures for the column being modified.
table BarrageModColumnMetadata {
/// This is an encoded and compressed RowSet for this column (within the viewport) that were modified.
/// There is no notification for modifications outside of the viewport.
modified_rows: [byte];
}
/// A data header describing the shared memory layout of a "record" or "row"
/// batch for a ticking barrage table.
table BarrageUpdateMetadata {
/// This batch is generated from an upstream table that ticks independently of the stream. If
/// multiple events are coalesced into one update, the server may communicate that here for
/// informational purposes.
first_seq: long;
last_seq: long;
/// Indicates if this message was sent due to upstream ticks or due to a subscription change.
is_snapshot: bool;
/// If this is a snapshot and the subscription is a viewport, then the effectively subscribed viewport
/// will be included in the payload. It is an encoded and compressed RowSet.
effective_viewport: [byte];
/// When this is set the viewport RowSet will be inverted against the length of the table. That is to say
/// every position value is converted from `i` to `n - i - 1` if the table has `n` rows.
effective_reverse_viewport: bool;
/// If this is a snapshot, then the effectively subscribed column set will be included in the payload.
effective_column_set: [byte];
/// This is an encoded and compressed RowSet that was added in this update.
added_rows: [byte];
/// This is an encoded and compressed RowSet that was removed in this update.
removed_rows: [byte];
/// This is an encoded and compressed RowSetShiftData describing how the keyspace of unmodified rows changed.
shift_data: [byte];
/// This is an encoded and compressed RowSet that was included with this update.
/// (the server may include rows not in addedRows if this is a viewport subscription to refresh
/// unmodified rows that were scoped into view)
added_rows_included: [byte];
/// The list of modified column data are in the same order as the field nodes on the schema.
mod_column_nodes: [BarrageModColumnMetadata];
}