Use SBE
The SBE Transport package is a publish-and-subscribe system for distributing table updates outside the Deephaven system using a SBE (Simple Binary Encoding) protocol.
There are presently reference clients implemented in Java, C# and C++, but it should be fairly simple to implement a client in any language for which an SBE codec generator exists. Only the SBE transport schema XML file is necessary to implement a client in this way.
To take advantage of this system, an SBE Transport TableServer instance is created in a Deephaven worker. Query tables are made available for subscription by clients through the TableServer instance.
Note
See also:
See com.illumon.iris.sbetransport.server.TableServer
in the Deephaven Javadoc.
To see our full tutorial and simple examples, see SBE Transport
import com.illumon.iris.sbetransport.server.TableServer
quotes=db.i("LearnDeephaven", "StockQuotes")
.where("Date=currentDateNy()")
.lastBy("Sym")
trades=db.i("LearnDeephaven", "StockTrades")
.where("Date=currentDateNy()")
.lastBy("Sym")
server = TableServer.start(7777)
server.exportTable("Quotes", quotes)
server.exportTable("Trades", trades)
This example exports the quote and trade tables under the names "Quotes" and "Trades" respectively. If the example is run as a persistent query, the client may connect to the server on TCP port 7777 to receive updates on those tables. This server will continue to make the exported tables available until either they are "unexported" or the worker is shut down.
Note
See also: See https://github.com/real-logic/simple-binary-encoding/wiki for details on SBE.
Clients
A client may be implemented in any language that can generate codecs from the Deephaven SBE XML specification. Reference clients in Java, C# and C++ are provided by Deephaven. The code snippets below show how to use the reference clients to subscribe to the exported tables above. For brevity, the message handler implementations are omitted. The reference clients are fully functional. However, if maximum performance is a consideration, you will likely want to generate message codecs from the XML schema and implement your own client that decodes incoming messages directly into your own data structures.
Examples
Java Subscribe
private class MessageListener implements SimpleMessageHandlerListener {
// implement message handlers here
}
final MessageListener messageListener = new MessageListener();
final SimpleMessageHandler messageHandler = new SimpleMessageHandler();
messageHandler.addListener(messageListener);
TableClient client = new TableClient(host, 7777, messageHandler);
client.subscribe(QUOTE_SUB_ID, SubscribeMode.SNAPSHOT_WITH_UPDATES, "Quotes");
client.subscribe(TRADE_SUB_ID, SubscribeMode.SNAPSHOT_WITH_UPDATES, "Trades");
// handle messages…
client.unsubscribe(QUOTE_SUB_ID)
client.unsubscribe(TRADE_SUB_ID)
C# Subscribe
SimpleMessageHandler simpleMessageHandler = new SimpleMessageHandler();
simpleMessageHandler.OnConnectionResponse += HandleConnectionResponse;
simpleMessageHandler.OnSubscribeResponse += HandleSubscribeResponse;
simpleMessageHandler.OnUnsubscribeResponse += HandleUnsubscribeResponse;
simpleMessageHandler.OnSubscriptionError += HandleSubscriptionError;
simpleMessageHandler.OnBeginTableUpdate += HandleBeginTableUpdate;
simpleMessageHandler.OnEndTableUpdate += HandleEndTableUpdate;
simpleMessageHandler.OnRowsAdded += HandleRowsAdded;
simpleMessageHandler.OnRowsModified += HandleRowsModified;
simpleMessageHandler.OnRowsRemoved += HandleRowsRemoved;
simpleMessageHandler.OnConnectionError += HandleConnectionError;
TableClient client = new TableClient(host, 7777, simpleMessageHandler)
client.Subscribe(QUOTE_SUB_ID, SubscribeMode.SNAPSHOT_WITH_UPDATES, "Quotes");
client.Subscribe(QUOTE_SUB_ID, SubscribeMode.SNAPSHOT_WITH_UPDATES, "Trades");
// handle messages…
client.unsubscribe(QUOTE_SUB_ID)
client.unsubscribe(TRADE_SUB_ID)
C++ Subscribe
class MyListener : public SimpleMessageHandlerListener {
// implement message handlers here
}
MyListener myListener;SimpleMessageHandler simpleMessageHandler;
simpleMessageHandler.addListener(&myListener);
TableClient client(host, 7777, &simpleMessageHandler);
client.subscribe(QUOTE_SUB_ID, SubscribeMode::SNAPSHOT_WITH_UPDATES, "Quotes");
client.subscribe(TRADE_SUB_ID, SubscribeMode::SNAPSHOT_WITH_UPDATES, "Trades");
// handle messages…
client.unsubscribe(QUOTE_SUB_ID)
client.unsubscribe(TRADE_SUB_ID)
Protocol
Every message is preceded by a header (defined by MessageHeader
in the specification), and the entire message is preceded by a 32-bit integer containing the message size (in little endian byte order). The content of each message should be parsed by the generated SBE codecs.
Upon connection to an SBE server, the client will receive a connection response message indicating the message buffer size the server is using. This permits the client to adjust its own buffer to match, which is important when processing table updates.
Once connected, the client may issue subscribe requests at any time using a TableSubscribe
message. The client must assign a numeric identifier to each subscription, which must be unique for the current session. This identifier will be used in the content of incoming messages to identify the subscription. Each subscription should identify an exported table, and a subscription "mode". Optionally, the set of columns in which the client is interested and/or a filter may be specified (if not specified, all rows and columns will be exported). A client may subscribe to the same table any number of times (as long as each subscription has a unique identifier).
Each subscription is created with one of three subscription modes: SNAPSHOT_ONLY
, SNAPSHOT_WITH_UPDATES
, or UPDATE_ONLY
(see XML spec for numeric values). SNAPSHOT_ONLY
will cause the server to send a one-time snapshot of the requested table and immediately remove the subscription. SNAPSHOT_WITH_UPDATES
will send a snapshot followed by updates until the client unsubscribes (a snapshot is encoded as a single large table update). This mode can be used to reconstruct the source table on the client side. UPDATE_ONLY
sends only updates until the client unsubscribes. This can be used if the client is interested only in changes to the source table (i.e. a logging use-case).
In response to a subscribe request, the server will send a TableSubscribeResponse
message, which contains the subscription identifier provided by the client as well as metadata indicating the names, numeric identifiers, and table update message identifiers for the subscribed columns. The client can use the numeric column identifier to identify the column on subsequent table update messages.
To unsubscribe, the client should send a TableUnsubscribe
message identifying an active subscription. In response, the server will send a TableUnsubscribeResponse
message confirming the operation.
If an error occurs at any time with respect to a particular subscription, the server will send a TableSubscriptionError
identifying the relevant subscription along with a numeric error code and a human-readable error message.
The SBE protocol is asynchronous, with a fixed, maximum message size (configured by the server) so large table updates may be broken up into any number of messages. For this reason, the client will receive BeginTableUpdate
and EndTableUpdate
messages bracketing each atomic table update. Any number of column data updates and/or row removal messages may be sent for each update. These are converted into RowsAdded
, RowsRemoved
, and/or RowsModified
events by the default listeners provided by the reference clients. A client implementation may want to use the BeginTableUpdate
and EndTableUpdate
messages as indications to acquire and release a lock on whatever structure is being used to store the incoming data. However, care must be taken to avoid deadlock in case a BeginTableUpdate
is received, and a subsequent connection error causes the loss of the EndTableUpdate
message.
Outgoing Message Types
The SBE protocol defines the following message types sent to the client to the server.
TableSubscribe
The client uses this message to subscribe to an exported table. It must specify a (client generated) unique subscription id and the exported name of the table. It may also optionally provide specific columns for which updates are requested and/or a filter which specifies the rows in which the client is interested (this is applied as a .where clause on the server).
TableUnsubscribe
The client uses this message to terminate an active subscription. It must specify the subscription identifier assigned when the subscription was initiated.
Incoming Message Types
The SBE protocol defines the following message types sent to the client from the server.
ConnectionResponse
This message is sent by the server to the client when a connection is initiated. It provides a message buffer size the client can use for subsequent messages.
TableSubscribeResponse
This message is sent to the client in response to a successful subscribe request, and provides the table column information (names and data types).
TableUnsubscribeResponse
This message is sent to the client in response to a successful unsubscribe request.
TableSubscriptionError
This message is sent to the client when an error occurs with respect to a subscription. When relevant, it contains the relevant subscription identifier as well as a textual description.
BeginTableUpdate
This message is sent to the client indicating the beginning of a table update. Table updates can span any number of messages, and are always bracketed by BeginTableUpdate
and EndTableUpdate
messages.
EndTableUpdate
This message is sent to the client indicating the end of a table update.
<Type>ColumnData
Updates to table data (rows added or removed) are sent to the client encoded as <Type>ColumnData
messages, where <Type>
represents the data type of the relevant column. For each updated row, at least one column data message will be sent for each column. Each column data message contains as many added/updated rows as can fit into the message as well as the row indexes affected. The row indexes are encoded as a series of ranges. For example, if rows 11-20 and 26-30 are affected in a given update, the column data messages will contain these two ranges encoded as [11,20]
and [25,30]
, followed by an array of the 15 values corresponding to these indexes. Large column updates will span multiple column data messages.
RowsRemoved
When rows are removed from a table, RowsRemoved
message(s) are sent to the client indicating the range(s) of rows removed. Row index ranges are encoded in the same fashion as in column data messages.
Supported Column Types
The following column types are supported by SBE. Details of the encoding can be observed in the XML schema document.
Primitive Column Types
Deephaven Data Type | SBE Update Message Type | Message Id | Comments |
---|---|---|---|
byte | ByteColumnData | 102 | Encoded as group(s) of 8-bit signed values with nulls indicated by Deephaven special value.* |
boolean | BooleanColumnData | 101 | Encoded as group(s) of bytes with the following meaning: 0: FALSE, 1: TRUE, -1: NULL |
char | CharColumnData | 103 | Encoded as group(s) of 16-bit unsigned values, with nulls indicated by Deephaven special value.* |
int | IntColumnData | 106 | Encoded as group(s) of 32 -bit signed integers with nulls indicated by Deephaven special value. * |
short | ShortColumnData | 108 | Encoded as group(s) of 16-bit signed integers with nulls indicated by Deephaven special value.* |
long | LongColumnData | 107 | Encoded as group(s) of 64-bit signed integers with nulls indicated by Deephaven special value. * |
float | FloatColumnData | 105 | Encoded as group(s) of 16-bit floating point values (single precision float) with nulls indicated by Deephaven special value. * |
double | DoubleColumnData | 104 | Encoded as group(s) of 32-bit floating point values (double precision float) with nulls indicated by Deephaven special value.* |
String | StringColumnData | 100 | Encoded as a series of variable length character data arrays, with a leading byte for each, indicating null/not null, and a length with a maximum value of 1073741824. |
DateTime | DateTimeColumnData | 119 | Encoded as nanoseconds since the epoch in a signed 64-bit integer. |
BigDecimal | DecimalColumnData | 109 | Encoded as magnitude & scale, fixed point representation is language dependent. |
StringSet | StringSetColumnData | 120 | Encoded in the same way as String |
*
See Deephaven Special Values below.
Array Types
SBE supports a limited number of array types. Both columns of the raw array type (i.e., byte[]
) and the corresponding database type (i.e., DbByteArray
) are supported and produce the same type of update messages. The latter are produced by Deephaven .by()
operations.
All array encodings support null values, both for the array value and the individual values within each array. Primitive nulls are represented in the same way as the non-array encodings (using the Deephaven special value).
Deephaven Data Type(s) | SBE Update Message Type | Message Id | Encoding |
---|---|---|---|
| ByteArrayColumnData | 112 | Group of byte groups, each with a leading null indicator. |
| BooleanArrayColumnData | 111 | Group of boolean groups, each with a leading null indicator. |
| CharArrayColumnData | 113 | Group of char groups, each with a leading null indicator. |
| IntArrayColumnData | 116 | Group of int groups, each with a leading null indicator. |
| ShortArrayColumnData | 118 | Group of short groups, each with a leading null indicator. |
| LongArrayColumnData | 117 | Group of long groups, each with a leading null indicator. |
| FloatArrayColumnData | 115 | Group of float groups, each with a leading null indicator. |
| DoubleArrayColumnData | 114 | Group of double groups, each with a leading null indicator. |
| StringArrayColumnData | 110 | Group of String groups, each with a leading null indicator. |
StringSet | StringSetColumnData | 120 | Same as String array. |
Deephaven Null Values
For primitive types, null values are represented with the following special values. Boolean is not included here, because booleans are encoded as a single byte in SBE and nulls are represented by -1.
Type | Java Constant | Value |
---|---|---|
byte | Byte.MIN_VALUE | -128 |
char | Character.MAX_VALUE-1 | 0xFFFE |
short | Short.MIN_VALUE | -32768 |
int | Integer.MIN_VALUE | 0x80000000 |
long | Long.MIN_VALUE | 0x8000000000000000 |
float | -Float.MAX_VALUE | -3.4028235e+38f |
double | -Double.MAX_VALUE | -1.7976931348623157e+308 |