Simple Binary Encoding (SBE) Transport

The SBE Transport package is a publish-and-subscribe system for distributing table updates outside the Deephaven system using a SBE (Simple Binary Encoding) protocol.

There are presently reference clients implemented in Java, C# and C++, but it should be fairly simple to implement a client in any language for which an SBE codec generator exists. Only the SBE transport schema XML file is necessary to implement a client in this way.

To take advantage of this system, an SBE Transport TableServer instance is created in a Deephaven worker. Query tables are made available for subscription by clients through the TableServer instance.

Note

See also: See com.illumon.iris.sbetransport.server.TableServer in the Deephaven Javadoc.

A simple example (written in Groovy) follows:

import com.illumon.iris.sbetransport.server.TableServer
quotes=db.t("LearnDeephaven", "StockQuotes")
    .where("Date=`2017-08-25`")
    .lastBy("Sym")
trades=db.t("LearnDeephaven", "StockTrades")
    .where("Date=`2017-08-25`")
    .lastBy("Sym")
server = TableServer.start(7777)
server.exportTable("Quotes", quotes)
server.exportTable("Trades", trades)

This example exports the quote and trade tables under the names "Quotes" and "Trades" respectively. If the example is run as a persistent query, the client may connect to the server on TCP port 7777 to receive updates on those tables. This server will continue to make the exported tables available until either they are "unexported" or the worker is shut down.

Note

See also: See https://github.com/real-logic/simple-binary-encoding/wiki for details on SBE.

Clients

A client may be implemented in any language that can generate codecs from the Deephaven SBE XML specification. Reference clients in Java, C# and C++, and an example using Python with jpy, are provided by Deephaven.

Java provides two clients: one in StandaloneJavaSbeClient-<version>.jar and one in SBETransport-<version>.jar. The client in the stand-alone jar is mostly the same as the one in the SBE transport jar, but it does not depend on any other Deephaven jars. The other two differences are that it converts DBDateTime column values to Java Instants, and that it uses an SbeClientStringSetWrapper rather than a regular Deephaven StringSetWrapper for StringSet columns. Although the stand-alone jar is versioned with the rest of Deephaven, any version of this jar should work with any Deephaven releases since release 20190607 ("Sultan"). The stand-alone jar has a dependency on the Real Logic sbe-all client. The example below was developed using sbe-all-1.28.3.jar.

The code snippets below show how to use the reference clients to subscribe to the exported tables above. For brevity, the message handler implementations are omitted except for the first (Java) example. The reference clients are fully functional. However, if maximum performance is a consideration, you will likely want to generate message codecs from the XML schema and implement your own client that decodes incoming messages directly into your own data structures.

As a convenience, the MessageListener example below is also available as a SimpleMessageListener class in the stand-alone jar: io.deephaven.sbe.standalone.client.SimpleMessageListener. This is used in the Python example below.

Examples

Java Subscribe

MessageListener class for Java SBE subscriber
import io.deephaven.sbe.standalone.client.ColumnInfo;
import io.deephaven.sbe.standalone.client.SimpleMessageHandlerListener;
import io.deephaven.sbe.standalone.gen.TableSubscriptionErrorCode;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

class MessageListener implements SimpleMessageHandlerListener {
    // avoid blocking tests forever if expected event fails to occur
    private static final long MAX_WAIT_MS = 20_000;

    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    private int subscribeResponseCount = 0;
    private int unsubscribeResponseCount = 0;
    private int subscriptionErrorCount = 0;
    private int beginTableUpdateCount = 0;
    private int endTableUpdateCount = 0;
    private int rowsAddedCount = 0;
    private int rowsModifiedCount = 0;
    private int rowsRemovedCount = 0;

    public int lastSubId;
    public int lastErrorSubId;
    public TableSubscriptionErrorCode lastErrorCode;
    public String lastErrorMessage;

    // exportedTable name -> subId
    private final Map<String, Integer> subMap = new HashMap<>();

    // subId -> columnName -> row index
    private final Map<Integer, Map<String, Integer>> subColumnMap = new HashMap<>();

    // very simple client-side table reconstruction
    // subId -> set of active rows
    private final Map<Integer, Set<Long>> subRowMap = new HashMap<>();

    // subId -> columnId -> rowId -> value
    private final Map<Integer, Map<Integer, Map<Long, Object>>> subTableDataMap = new HashMap<>();

    private Date deadline() {
        return deadline(MAX_WAIT_MS);
    }

    private Date deadline(final long timeoutMillis) {
        final Date now = new Date();
        return new Date(now.getTime() + timeoutMillis);
    }

    void expectSubscribeResponse(final int n) throws InterruptedException {
        expectSubscribeResponse(n, MAX_WAIT_MS);
    }

    void expectSubscribeResponse(final int n, final long timeoutMillis) throws InterruptedException {
        final Date deadline = deadline(timeoutMillis);
        lock.lock();
        try {
            while (subscribeResponseCount < n) {
                if (!condition.awaitUntil(deadline)) {
                    throw new RuntimeException("No subscribe response received by deadline with timeout of " + timeoutMillis + " ms");
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void expectUnsubscribeResponse(final int n) throws InterruptedException {
        final Date deadline = deadline();
        lock.lock();
        try {
            while (unsubscribeResponseCount < n) {
                if (!condition.awaitUntil(deadline)) {
                    throw new RuntimeException("No unsubscribe response received by deadline");
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void expectSubscriptionError(final int n) throws InterruptedException {
        final Date deadline = deadline();
        lock.lock();
        try {
            while (subscriptionErrorCount < n) {
                if (!condition.awaitUntil(deadline)) {
                    throw new RuntimeException("No subscription error received by deadline");
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void expectBeginTableUpdate(final int n) throws InterruptedException {
        final Date deadline = deadline();
        lock.lock();
        try {
            while (beginTableUpdateCount < n) {
                if (!condition.awaitUntil(deadline)) {
                    throw new RuntimeException("No start table update received by deadline");
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void expectEndTableUpdate(final int n) throws InterruptedException {
        expectEndTableUpdate(n, MAX_WAIT_MS);
    }

    void expectEndTableUpdate(final int n, final long timeoutMillis) throws InterruptedException {
        final Date deadline = deadline(timeoutMillis);
        lock.lock();
        try {
            while (endTableUpdateCount < n) {
                if (!condition.awaitUntil(deadline)) {
                    throw new RuntimeException("No end table update received by deadline with timeout of " + timeoutMillis + " ms");
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void expectRowsAdded(final int n) throws InterruptedException {
        final Date deadline = deadline();
        lock.lock();
        try {
            while (rowsAddedCount < n) {
                if (!condition.awaitUntil(deadline)) {
                    throw new RuntimeException("No rows added received by deadline");
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void expectRowsModified(final int n) throws InterruptedException {
        final Date deadline = deadline();
        lock.lock();
        try {
            while (rowsModifiedCount < n) {
                if (!condition.awaitUntil(deadline)) {
                    throw new RuntimeException("No rows modified received by deadline");
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void expectRowsRemoved(final int n) throws InterruptedException {
        final Date deadline = deadline();
        lock.lock();
        try {
            while (rowsRemovedCount < n) {
                if (!condition.awaitUntil(deadline)) {
                    throw new RuntimeException("No rows removed received by deadline");
                }
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void onConnectionError(final IOException ex) {

    }

    @Override
    public void onSubscribeResponse(int subId, Map<Integer, ColumnInfo> columnData) {
        lock.lock();

        lastSubId = subId;
        // save the name->column id mapping
        final Map<String, Integer> columnMap = subColumnMap.computeIfAbsent(subId, id -> new HashMap<>());
        for (ColumnInfo cd : columnData.values()) {
            columnMap.put(cd.getName(), cd.getColumnId());
        }
        subscribeResponseCount++;
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void onUnsubscribeResponse(int subId) {
        lock.lock();
        lastSubId = subId;
        unsubscribeResponseCount++;
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }


    @Override
    public void onSubscriptionError(int subId, TableSubscriptionErrorCode errorCode, String errorMessage) {
        lock.lock();

        lastErrorSubId = subId;
        lastErrorCode = errorCode;
        lastErrorMessage = errorMessage;

        subscriptionErrorCount++;
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void onBeginTableUpdate(int subId) {
        lock.lock();
        beginTableUpdateCount++;
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void onEndTableUpdate(int subId) {
        System.out.println("Got end table update");
        lock.lock();
        endTableUpdateCount++;
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void onRowsAdded(int subscriptionId, int columnId, long[] rows, final Object[] values, int nRows) {
        final Set<Long> rowSet = subRowMap.computeIfAbsent(subscriptionId, subId -> new HashSet<>());
        Arrays.stream(rows).limit(nRows).forEach(rowSet::add);
        final Map<Integer, Map<Long, Object>> tableDataMap = subTableDataMap.computeIfAbsent(subscriptionId, subId -> new HashMap<>());
        final Map<Long, Object> columnDataMap = tableDataMap.computeIfAbsent(columnId, id -> new HashMap<>());
        for (int i = 0; i < nRows; i++) {
            columnDataMap.put(rows[i], values[i]);
        }
        lock.lock();
        rowsAddedCount++;
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void onRowsModified(int subscriptionId, int columnId, long[] rows, final Object[] values, int nRows) {
        final Set<Long> rowSet = subRowMap.computeIfAbsent(subscriptionId, subId -> new HashSet<>());
        Arrays.stream(rows).limit(nRows).forEach(rowSet::add);
        final Map<Integer, Map<Long, Object>> tableDataMap = subTableDataMap.computeIfAbsent(subscriptionId, subId -> new HashMap<>());
        final Map<Long, Object> columnDataMap = tableDataMap.computeIfAbsent(columnId, id -> new HashMap<>());
        for (int i = 0; i < nRows; i++) {
            columnDataMap.put(rows[i], values[i]);
        }
        lock.lock();
        rowsModifiedCount++;
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void onRowsRemoved(int subscriptionId, long[] rows, int nRows) {
        final Set<Long> rowSet = subRowMap.get(subscriptionId);
        for (int i = 0; i < nRows; i++) {
            rowSet.remove(rows[i]);
        }
        lock.lock();
        rowsRemovedCount++;
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    public long[] getRows(int subId) {
        return subRowMap.get(subId).stream().sorted().mapToLong(Long::longValue).toArray();
    }

    public Object[] getColumnData(int subId, String columnName, long[] rows) {
        final Map<String, Integer> columnMap = subColumnMap.get(subId);
        final Map<Integer, Map<Long, Object>> columnDataMap = subTableDataMap.get(subId);

        final int columnId = columnMap.get(columnName);
        final Map<Long, Object> rowDataMap = columnDataMap.get(columnId);
        final Object[] column = new Object[rows.length];
        for (int i = 0; i < rows.length; i++) {
            column[i] = rowDataMap.get(rows[i]);
        }
        return column;
    }

    public Object[] getColumnData(int subId, String columnName) {
        long[] rows = getRows(subId);
        final Map<String, Integer> columnMap = subColumnMap.get(subId);
        final Map<Integer, Map<Long, Object>> columnDataMap = subTableDataMap.get(subId);

        final int columnId = columnMap.get(columnName);
        final Map<Long, Object> rowDataMap = columnDataMap.get(columnId);
        final Object[] column = new Object[rows.length];
        for (int i = 0; i < rows.length; i++) {
            column[i] = rowDataMap.get(rows[i]);
        }
        return column;
    }

    public int[] getIntColumnData(int subId, String columnName, long[] rows) {
        return Arrays.stream(getColumnData(subId, columnName, rows)).mapToInt(o -> (int) o).toArray();
    }

    public double[] getDoubleColumnData(int subId, String columnName, long[] rows) {
        return Arrays.stream(getColumnData(subId, columnName, rows)).mapToDouble(o -> (double) o).toArray();
    }

    private double[] toDoubleArray(Object[] array) {
        if (array == null)
            return null;
        return Arrays.stream(array).mapToDouble(o -> (double) o).toArray();
    }

    public double[][] getDoubleArrayColumnData(int subId, String columnName, long[] rows) {
        return Arrays.stream(getColumnData(subId, columnName, rows))
                .map(o -> toDoubleArray((Object[]) o)).collect(Collectors.toList()).toArray(new double[0][]);
    }

    private float[] toFloatArray(Object[] array) {
        if (array == null)
            return null;
        final float[] fa = new float[array.length];
        for (int i = 0; i < fa.length; i++) {
            fa[i] = (float) array[i];
        }
        return fa;
    }

    public float[][] getFloatArrayColumnData(int subId, String columnName, long[] rows) {
        return Arrays.stream(getColumnData(subId, columnName, rows))
                .map(o -> toFloatArray((Object[]) o))
                .collect(Collectors.toList()).toArray(new float[0][]);
    }

    public long[] getLongColumnData(int subId, String columnName, long[] rows) {
        return Arrays.stream(getColumnData(subId, columnName, rows)).mapToLong(o -> (long) o).toArray();
    }

    public short[] getShortColumnData(int subId, String columnName, long[] rows) {
        final Object[] objData = getColumnData(subId, columnName, rows);
        final short[] data = new short[objData.length];
        for (int i = 0; i < rows.length; i++) {
            data[i] = (short) objData[i];
        }
        return data;
    }

    public byte[] getByteColumnData(int subId, String columnName, long[] rows) {
        final Object[] objData = getColumnData(subId, columnName, rows);
        final byte[] data = new byte[objData.length];
        for (int i = 0; i < rows.length; i++) {
            data[i] = (byte) objData[i];
        }
        return data;
    }

    public Boolean[] getBooleanColumnData(int subId, String columnName, long[] rows) {
        final Object[] objData = getColumnData(subId, columnName, rows);
        final Boolean[] data = new Boolean[objData.length];
        for (int i = 0; i < rows.length; i++) {
            data[i] = (Boolean) objData[i];
        }
        return data;
    }

    public float[] getFloatColumnData(int subId, String columnName, long[] rows) {
        final Object[] objData = getColumnData(subId, columnName, rows);
        final float[] data = new float[objData.length];
        for (int i = 0; i < rows.length; i++) {
            data[i] = (float) objData[i];
        }
        return data;
    }

    public String[] getStringColumnData(int subId, String columnName, long[] rows) {
        final Object[] objData = getColumnData(subId, columnName, rows);
        final String[] data = new String[objData.length];
        for (int i = 0; i < rows.length; i++) {
            data[i] = (String) objData[i];
        }
        return data;
    }

    public BigDecimal[] getDecimalColumnData(int subId, String columnName, long[] rows) {
        final Object[] objData = getColumnData(subId, columnName, rows);
        final BigDecimal[] data = new BigDecimal[objData.length];
        for (int i = 0; i < rows.length; i++) {
            data[i] = (BigDecimal) objData[i];
        }
        return data;
    }

    public char[] getCharColumnData(int subId, String columnName, long[] rows) {
        final Object[] objData = getColumnData(subId, columnName, rows);
        final char[] data = new char[objData.length];
        for (int i = 0; i < rows.length; i++) {
            data[i] = (char) objData[i];
        }
        return data;
    }

    public byte[][] getBlobColumnData(int subId, String columnName, long[] rows) {
        final Object[] objData = getColumnData(subId, columnName, rows);
        final byte[][] data = new byte[objData.length][];
        for (int i = 0; i < rows.length; i++) {
            data[i] = (byte[]) objData[i];
        }
        return data;
    }

}
Java SBE subscriber class
import io.deephaven.sbe.standalone.client.SimpleMessageHandler;
import io.deephaven.sbe.standalone.client.TableClient;
import io.deephaven.sbe.standalone.gen.SubscribeMode;

import java.io.IOException;

public class SBEClient {

    private static final int QUOTE_SUB_ID=1;
    private static final int TRADE_SUB_ID=2;

    public static void main(final String[] commandLineArguments) {
        final MessageListener messageListener = new MessageListener();
        final SimpleMessageHandler messageHandler = new SimpleMessageHandler();
        messageHandler.addListener(messageListener);

        if (commandLineArguments.length<2) {
            throw new RuntimeException("Usage SBEClient <hostname> <port>");
        }

        final String host = commandLineArguments[0];
        final int port = Integer.parseInt(commandLineArguments[1]);
        final TableClient client;

        try {
            client = new TableClient(host, port, messageHandler);
        } catch (final IOException ex) {
            throw new RuntimeException("I/O exception occurred while initializing TableClient:", ex);
        }

        try {
            client.subscribe(QUOTE_SUB_ID, SubscribeMode.SNAPSHOT_ONLY, "Quotes");
        } catch (final IOException ex) {
            throw new RuntimeException("I/O exception occurred while subscribing:", ex);
        }

        try {
            client.subscribe(TRADE_SUB_ID, SubscribeMode.SNAPSHOT_ONLY, "Trades");
        } catch (final IOException ex) {
            throw new RuntimeException("I/O exception occurred while subscribing:", ex);
        }

        try {
            messageListener.expectSubscribeResponse(QUOTE_SUB_ID);
            messageListener.expectBeginTableUpdate(QUOTE_SUB_ID);
            messageListener.expectRowsAdded(QUOTE_SUB_ID);
            messageListener.expectEndTableUpdate(QUOTE_SUB_ID);
        } catch (final InterruptedException ex) {
            throw new RuntimeException("Interrupted exception occurred while preparing to receive data:", ex);
        }

        try {
            messageListener.expectSubscribeResponse(TRADE_SUB_ID);
            messageListener.expectBeginTableUpdate(TRADE_SUB_ID);
            messageListener.expectRowsAdded(TRADE_SUB_ID);
            messageListener.expectEndTableUpdate(TRADE_SUB_ID);
        } catch (final InterruptedException ex) {
            throw new RuntimeException("Interrupted exception occurred while preparing to receive data:", ex);
        }

        // This app overall does not do much, but just connects and shows how many rows were received for each of the table subscriptions
        System.out.println("Quotes table has received: " + messageListener.getRows(QUOTE_SUB_ID).length + " rows.");;
        System.out.println("Trades table has received: " + messageListener.getRows(TRADE_SUB_ID).length + " rows.");;

        try {
            client.unsubscribe(QUOTE_SUB_ID);
            client.unsubscribe(TRADE_SUB_ID);
        } catch (final IOException ex) {
            throw new RuntimeException("I/O exception occurred while unsubscribing:", ex);
        }

        System.exit(0);
    }
}

C# Subscribe

SimpleMessageHandler simpleMessageHandler = new SimpleMessageHandler();
simpleMessageHandler.OnConnectionResponse += HandleConnectionResponse;
simpleMessageHandler.OnSubscribeResponse += HandleSubscribeResponse;
simpleMessageHandler.OnUnsubscribeResponse += HandleUnsubscribeResponse;
simpleMessageHandler.OnSubscriptionError += HandleSubscriptionError;
simpleMessageHandler.OnBeginTableUpdate += HandleBeginTableUpdate;
simpleMessageHandler.OnEndTableUpdate += HandleEndTableUpdate;
simpleMessageHandler.OnRowsAdded += HandleRowsAdded;
simpleMessageHandler.OnRowsModified += HandleRowsModified;
simpleMessageHandler.OnRowsRemoved += HandleRowsRemoved;
simpleMessageHandler.OnConnectionError += HandleConnectionError;

TableClient client = new TableClient(host, 7777, simpleMessageHandler)

client.Subscribe(QUOTE_SUB_ID, SubscribeMode.SNAPSHOT_WITH_UPDATES, "Quotes");
client.Subscribe(QUOTE_SUB_ID, SubscribeMode.SNAPSHOT_WITH_UPDATES, "Trades");

// handle messages…

client.unsubscribe(QUOTE_SUB_ID);
client.unsubscribe(TRADE_SUB_ID);

C++ Subscribe

class MyListener : public SimpleMessageHandlerListener {
    // implement message handlers here
}
MyListener myListener;SimpleMessageHandler simpleMessageHandler;
simpleMessageHandler.addListener(&myListener);

TableClient client(host, 7777, &simpleMessageHandler);

client.subscribe(QUOTE_SUB_ID, SubscribeMode::SNAPSHOT_WITH_UPDATES, "Quotes");
client.subscribe(TRADE_SUB_ID, SubscribeMode::SNAPSHOT_WITH_UPDATES, "Trades");

// handle messages…
client.unsubscribe(QUOTE_SUB_ID);
client.unsubscribe(TRADE_SUB_ID);

Python Subscribe

This example requires jpy. jpy can be installed separately or as part of the Deephaven Python legacy client installation.

Example SBE subscriber using jpy and the stand-alone Deephaven SBE client jar
import jpyutil
jpyutil.preload_jvm_dll()

import jpy

# at a minimum, the stand-alone SBE client jar for the Deephaven version being used, the sbe-tool jar, and the agrona jar are needed in the jpy classpath
jpy.create_jvm(
    ["-Xmx1024m",
    "-Djava.class.path=/<path to jar>/StandaloneJavaSbeClient-1.20221001.301.jar:/<path to jar>/sbe-tool-1.30.0.jar:/<path to jar>/agrona-1.20.0.jar"]
    )

message_handler=jpy.get_type("io.deephaven.sbe.standalone.client.SimpleMessageHandler")()
message_listener=jpy.get_type("io.deephaven.sbe.standalone.client.SimpleMessageListener")()
table_client=jpy.get_type("io.deephaven.sbe.standalone.client.TableClient")
subscriber_mode=jpy.get_type("io.deephaven.sbe.standalone.gen.SubscribeMode")

QUOTE_SUB_ID=1

message_handler.addListener(message_listener)
host = "<Deephaven Query Server running the TableServer>"
port = <port for the TableServer>

client = table_client(host, port, message_handler)
client.subscribe(QUOTE_SUB_ID, subscriber_mode.SNAPSHOT_ONLY, "Quotes")

message_listener.expectSubscribeResponse(QUOTE_SUB_ID)
message_listener.expectBeginTableUpdate(QUOTE_SUB_ID)
message_listener.expectRowsAdded(QUOTE_SUB_ID)
message_listener.expectEndTableUpdate(QUOTE_SUB_ID)

print("Quotes table has received: " + str(len(message_listener.getRows(QUOTE_SUB_ID))) + " rows.")
client.unsubscribe(QUOTE_SUB_ID)

Protocol

Every message is preceded by a header (defined by MessageHeader in the specification), and the entire message is preceded by a 32-bit integer containing the message size (in little endian byte order). The content of each message should be parsed by the generated SBE codecs.

Upon connection to an SBE server, the client will receive a connection response message indicating the message buffer size the server is using. This permits the client to adjust its own buffer to match, which is important when processing table updates.

Once connected, the client may issue subscribe requests at any time using a TableSubscribe message. The client must assign a numeric identifier to each subscription, which must be unique for the current session. This identifier will be used in the content of incoming messages to identify the subscription. Each subscription should identify an exported table, and a subscription "mode". Optionally, the set of columns in which the client is interested and/or a filter may be specified (if not specified, all rows and columns will be exported). A client may subscribe to the same table any number of times (as long as each subscription has a unique identifier).

Each subscription is created with one of three subscription modes: SNAPSHOT_ONLY, SNAPSHOT_WITH_UPDATES, or UPDATE_ONLY (see XML spec for numeric values). SNAPSHOT_ONLY will cause the server to send a one-time snapshot of the requested table and immediately remove the subscription. SNAPSHOT_WITH_UPDATES will send a snapshot followed by updates until the client unsubscribes (a snapshot is encoded as a single large table update). This mode can be used to reconstruct the source table on the client side. UPDATE_ONLY sends only updates until the client unsubscribes. This can be used if the client is interested only in changes to the source table (i.e. a logging use-case).

In response to a subscribe request, the server will send a TableSubscribeResponse message, which contains the subscription identifier provided by the client as well as metadata indicating the names, numeric identifiers, and table update message identifiers for the subscribed columns. The client can use the numeric column identifier to identify the column on subsequent table update messages.

To unsubscribe, the client should send a TableUnsubscribe message identifying an active subscription. In response, the server will send a TableUnsubscribeResponse message confirming the operation.

If an error occurs at any time with respect to a particular subscription, the server will send a TableSubscriptionError identifying the relevant subscription along with a numeric error code and a human-readable error message.

The SBE protocol is asynchronous, with a fixed, maximum message size (configured by the server) so large table updates may be broken up into any number of messages. For this reason, the client will receive BeginTableUpdate and EndTableUpdate messages bracketing each atomic table update. Any number of column data updates and/or row removal messages may be sent for each update. These are converted into RowsAdded, RowsRemoved, and/or RowsModified events by the default listeners provided by the reference clients. A client implementation may want to use the BeginTableUpdate and EndTableUpdate messages as indications to acquire and release a lock on whatever structure is being used to store the incoming data. However, care must be taken to avoid deadlock in case a BeginTableUpdate is received, and a subsequent connection error causes the loss of the EndTableUpdate message.

Outgoing Message Types

The SBE protocol defines the following message types sent to the client to the server.

TableSubscribe

The client uses this message to subscribe to an exported table. It must specify a (client generated) unique subscription id and the exported name of the table. It may also optionally provide specific columns for which updates are requested and/or a filter which specifies the rows in which the client is interested (this is applied as a .where clause on the server).

TableUnsubscribe

The client uses this message to terminate an active subscription. It must specify the subscription identifier assigned when the subscription was initiated.

Incoming Message Types

The SBE protocol defines the following message types sent to the client from the server.

ConnectionResponse

This message is sent by the server to the client when a connection is initiated. It provides a message buffer size the client can use for subsequent messages.

TableSubscribeResponse

This message is sent to the client in response to a successful subscribe request, and provides the table column information (names and data types).

TableUnsubscribeResponse

This message is sent to the client in response to a successful unsubscribe request.

TableSubscriptionError

This message is sent to the client when an error occurs with respect to a subscription. When relevant, it contains the relevant subscription identifier as well as a textual description.

BeginTableUpdate

This message is sent to the client indicating the beginning of a table update. Table updates can span any number of messages, and are always bracketed by BeginTableUpdate and EndTableUpdate messages.

EndTableUpdate

This message is sent to the client indicating the end of a table update.

<Type>ColumnData

Updates to table data (rows added or removed) are sent to the client encoded as <Type>ColumnData messages, where <Type> represents the data type of the relevant column. For each updated row, at least one column data message will be sent for each column. Each column data message contains as many added/updated rows as can fit into the message as well as the row indexes affected. The row indexes are encoded as a series of ranges. For example, if rows 11-20 and 26-30 are affected in a given update, the column data messages will contain these two ranges encoded as [11,20] and [25,30], followed by an array of the 15 values corresponding to these indexes. Large column updates will span multiple column data messages.

RowsRemoved

When rows are removed from a table, RowsRemoved message(s) are sent to the client indicating the range(s) of rows removed. Row index ranges are encoded in the same fashion as in column data messages.

Supported Column Types

The following column types are supported by SBE. Details of the encoding can be observed in the SBE transport schema XML file.

Primitive Column Types

Deephaven Data TypeSBE Update Message TypeMessage IdComments
byteByteColumnData102Encoded as group(s) of 8-bit signed values with nulls indicated by Deephaven special value.*
booleanBooleanColumnData101Encoded as group(s) of bytes with the following meaning: 0: FALSE, 1: TRUE, -1: NULL
charCharColumnData103Encoded as group(s) of 16-bit unsigned values, with nulls indicated by Deephaven special value.*
intIntColumnData106Encoded as group(s) of 32 -bit signed integers with nulls indicated by Deephaven special value. *
shortShortColumnData108Encoded as group(s) of 16-bit signed integers with nulls indicated by Deephaven special value.*
longLongColumnData107Encoded as group(s) of 64-bit signed integers with nulls indicated by Deephaven special value. *
floatFloatColumnData105Encoded as group(s) of 16-bit floating point values (single precision float) with nulls indicated by Deephaven special value. *
doubleDoubleColumnData104Encoded as group(s) of 32-bit floating point values (double precision float) with nulls indicated by Deephaven special value.*
StringStringColumnData100Encoded as a series of variable length character data arrays, with a leading byte for each, indicating null/not null, and a length with a maximum value of 1073741824.
DateTimeDateTimeColumnData119Encoded as nanoseconds since the epoch in a signed 64-bit integer.
BigDecimalDecimalColumnData109Encoded as magnitude & scale, fixed point representation is language dependent.
StringSetStringSetColumnData120Encoded in the same way as String

* See Deephaven Special Values below.

Array Types

SBE supports a limited number of array types. Both columns of the raw array type (i.e., byte[]) and the corresponding database type (i.e., DbByteArray) are supported and produce the same type of update messages. The latter are produced by Deephaven .by() operations.

All array encodings support null values, both for the array value and the individual values within each array. Primitive nulls are represented in the same way as the non-array encodings (using the Deephaven special value).

Deephaven Data Type(s)SBE Update Message TypeMessage IdEncoding
  • byte[]
  • DbByteArray
ByteArrayColumnData112Group of byte groups, each with a leading null indicator.
  • boolean[]
  • DbArray<Boolean>
BooleanArrayColumnData111Group of boolean groups, each with a leading null indicator.
  • char[]
  • DbCharArray
CharArrayColumnData113Group of char groups, each with a leading null indicator.
  • int[]
  • DbIntArray
IntArrayColumnData116Group of int groups, each with a leading null indicator.
  • short[]
  • DbShortArray
ShortArrayColumnData118Group of short groups, each with a leading null indicator.
  • long[]
  • DbLongArray
LongArrayColumnData117Group of long groups, each with a leading null indicator.
  • float[]
  • DbFloatArray
FloatArrayColumnData115Group of float groups, each with a leading null indicator.
  • double[]
  • DbDoubleArray
DoubleArrayColumnData114Group of double groups, each with a leading null indicator.
  • String[]
  • DbArray<String>
StringArrayColumnData110Group of String groups, each with a leading null indicator.
StringSetStringSetColumnData120Same as String array.

Deephaven Null Values

For primitive types, null values are represented with the following special values. Boolean is not included here, because booleans are encoded as a single byte in SBE and nulls are represented by -1.

TypeJava ConstantValue
byteByte.MIN_VALUE-128
charCharacter.MAX_VALUE-10xFFFE
shortShort.MIN_VALUE-32768
intInteger.MIN_VALUE0x80000000
longLong.MIN_VALUE0x8000000000000000
float-Float.MAX_VALUE-3.4028235e+38f
double-Double.MAX_VALUE-1.7976931348623157e+308