Subscribing to Ticking Tables

Description

Some C++ applications need to maintain a local copy of a table that stays in sync with the server side. This is useful when your code is using custom libraries or other algorithms that are more natural to execute on the client rather than the server.

To support this functionality, TableHandle provides the Subscribe method. When this method is invoked, the Deephaven client library will start maintaining an internal local copy of the table, keeping it up to date by processing periodic low-level update messages from the server. Furthermore, the client library will provide a digested form of these updates to subscribing code in a user-friendly form. The client library will continue to maintain this subscription until the user calls Unsubscribe.

A simple example is:

TableHandle my_ticking_data = manager.FetchTable("MyTickingData");
auto sub_handle = my_ticking_data.Subscribe(std::make_shared<MyCallback>());
// Do all your processing...
// Then when all done, unsubscribe...
my_ticking_data.Unsubscribe(std::move(sub_handle));

The user-supplied MyCallback is a class deriving from TickingCallback which overrides two methods:

When table data changes on the server side, those changes are batched up and periodically transmitted to the client (by default, at a rate of about once per second). First, the client library processes these low-level messages and applies them to its internal copy of the table. Then, it constructs a user-friendly TickingUpdate object and invokes the user’s OnTick callback, passing it the TickingUpdate object.

A skeleton of a user-defined callback object looks like this:

class MyCallback final : public deephaven::dhcore::ticking::TickingCallback {
public:
  void OnTick(deephaven::dhcore::ticking::TickingUpdate update) final {
    // handle the update message
  }

  void OnFailure(std::exception_ptr ep) final {
    // handle the failure
  }
};

Structure of updates from the server

Synchronization between the server and client is performed via the Barrage protocol. The low-level details of this protocol are outside the scope of this document. However, at a high level, the purpose of the protocol is to efficiently transmit differences between table versions as a table changes over time. Those differences include:

  • removed rows

  • shifts (note 1)

  • added rows

  • modified cells (note 2)

Notes:

1. shifts are a way to express the renumbering (but not reordering) of internal row keys. In this version of the client, we do not expose internal row keys to the caller. So you will not see shifts represented in the TickingUpdate class in this version of the client.

2. In the above we explicitly refer to modified cells rather than modified rows, because when a row is modified, typically only some cells within that row change but others stay the same. For the sake of efficiency, the Barrage protocol allows the server to specify the specific cells that changed within a row. These modifications are represented on a per-column basis. That is, for each column, the library will indicate (via a RowSequence) which rows of that column were modified.

The TickingUpdate class

The TickingUpdate class represents the changes that have happened to the table since the last tick. It contains snapshots (Prev and Current) of the table at the start and end of the entire update operation, as well as intermediate snapshots

  • before and after the remove operation,

  • before and after the add operation, and

  • before and after the modify operation.

It also contains RowSequence values representing the positions of the removed, added, and modified items.

For some callers, the per-update Prev and Current table snapshots suffice for their needs. These snapshots tell the caller how the table looked before the update and after the update, respectively. Other callers will need more precise information: exactly what rows were removed, added, and modified. These callers can use the per-operation snapshots.

The per-update snapshots are:

  • Prev - snapshot of the table before any of this cycle’s updates were applied.

  • Current - snapshot of the table after all of this cycle’s updates were applied.

The more fine-grained per-operation snaphots are:

  • BeforeRemoves - snapshot of the table as it appeared before the remove operation

  • AfterRemoves - snapshot of the table as it appeared after the remove operation

  • BeforeAdds - snapshot of the table as it appeared before the add operation

  • AfterAdds - snapshot of the table as it appeared after the add operation

  • BeforeModifies - snapshot of the table as it appeared before the modify operation

  • AfterModifies - snapshot of the table as it appeared after the modify operation

Some of these snapshots are duplicative: For example, due to the order in which changes are applied internally, it happens to be the case that AfterRemoves and BeforeAdds refer to exactly the same snapshot. We provide these extra snapshots for the programmer’s convenience and intuition.

The library also takes pains to coalesce snapshots. For example, if no removes happen in a given update, then the BeforeRemoves pointer will compare equal to the AfterRemoves pointer.

Some readers may be concerned about the cost of maintaining all these snapshots. Internally, the snapshots are represented by copy-on-write data structures that take pains to do a lot of structural sharing. Broadly speaking, it is not expensive to have two snapshots of a table when most of the data is unchanged between the two tables. The specific implementation of this snapshotting data structure comes from the Immer Persistent and Immutable Data Structures project.

The TickingUpdate object also provides RowSequence objects indicating which specific rows were changed. The provided RowSequence objects are:

Declarations

class ClientTable

An abstract base class representing a Deephaven table. This is used for example in TickingUpdate to provide table snapshots to a caller who has subscribed to ticking tables.

Subclassed by deephaven::client::arrowutil::ArrowClientTable

Public Types

using ColumnSource = deephaven::dhcore::column::ColumnSource

Alias.

using RowSequence = deephaven::dhcore::container::RowSequence

Alias.

Public Functions

ClientTable() = default

Constructor.

virtual ~ClientTable() = default

Destructor.

virtual std::shared_ptr<RowSequence> GetRowSequence() const = 0

Get the RowSequence (in position space) that underlies this Table.

virtual std::shared_ptr<ColumnSource> GetColumn(size_t column_index) const = 0

Gets a ColumnSource from the ClientTable by index.

Parameters:

column_index – Must be in the half-open interval [0, NumColumns).

std::shared_ptr<ColumnSource> GetColumn(std::string_view name, bool strict) const

Gets a ColumnSource from the ClientTable by name. ‘strict’ controls whether the method must succeed.

Parameters:
  • name – The name of the column.

  • strict – Whether the method must succeed.

Returns:

If ‘name’ was found, returns the ColumnSource. If ‘name’ was not found and ‘strict’ is true, throws an exception. If ‘name’ was not found and ‘strict’ is false, returns nullptr.

std::optional<size_t> GetColumnIndex(std::string_view name, bool strict) const

Gets the index of a ColumnSource from the ClientTable by name. ‘strict’ controls whether the method must succeed.

Parameters:
  • name – The name of the column.

  • strict – Whether the method must succeed.

Returns:

If ‘name’ was found, returns the index of the ColumnSource. If ‘name’ was not found and ‘strict’ is true, throws an exception. If ‘name’ was not found and ‘strict’ is false, returns an empty optional.

virtual size_t NumRows() const = 0

Number of rows in the clienttable.

virtual size_t NumColumns() const = 0

Number of columns in the clienttable.

virtual std::shared_ptr<deephaven::dhcore::clienttable::Schema> Schema() const = 0

The clienttable schema.

internal::TableStreamAdaptor Stream(bool want_headers, bool want_row_numbers) const

Creates an ‘ostream adaptor’ to use when printing the clienttable. Example usage: std::cout << myTable.Stream(true, false).

internal::TableStreamAdaptor Stream(bool want_headers, bool want_row_numbers, std::shared_ptr<RowSequence> row_sequence) const

Creates an ‘ostream adaptor’ to use when printing the clienttable. Example usage: std::cout << myTable.Stream(true, false, rowSeq).

internal::TableStreamAdaptor Stream(bool want_headers, bool want_row_numbers, std::vector<std::shared_ptr<RowSequence>> row_sequences) const

Creates an ‘ostream adaptor’ to use when printing the clienttable. Example usage: std::cout << myTable.Stream(true, false, rowSequences).

std::string ToString(bool want_headers, bool want_row_numbers) const

For debugging and demos.

std::string ToString(bool want_headers, bool want_row_numbers, std::shared_ptr<RowSequence> row_sequence) const

For debugging and demos.

std::string ToString(bool want_headers, bool want_row_numbers, std::vector<std::shared_ptr<RowSequence>> row_sequences) const

For debugging and demos.

class Schema

The table schema that goes along with a Table class. This Schema object tells you about the names and data types of the table columns.

Public Functions

Schema(Private, std::vector<std::string> names, std::vector<ElementTypeId::Enum> types, std::map<std::string_view, size_t, std::less<>> index)

Constructor.

~Schema()

Destructor.

Public Static Functions

static std::shared_ptr<Schema> Create(std::vector<std::string> names, std::vector<ElementTypeId::Enum> types)

Factory method

class TickingCallback

Abstract base class used to define the caller’s ticking callback object. This object is passed to the TableHandle::Subscribe() method.

Subclassed by WrappedTickingCallback

Public Functions

virtual void OnTick(TickingUpdate update) = 0

Invoked on each update to the subscription.

virtual void OnFailure(std::exception_ptr eptr) = 0

Invoked if there is an error involving the subscription.

class TickingUpdate

An update message (passed to client code via TickingCallback::OnTick()) which describes the changes (removes, adds, modifies) that transform the previous version of the table to the new version. This class is threadsafe and can be kept around for an arbitrary amount of time, though this will consume some memory. The underlying snapshots share a common substructure, so the amount of memory they consumed is roughly proportional to the amount of “new” data in that snapshot.

Public Types

using RowSequence = deephaven::dhcore::container::RowSequence

Alias.

using ClientTable = deephaven::dhcore::clienttable::ClientTable

Alias.

Public Functions

TickingUpdate()

Default constructor.

TickingUpdate(std::shared_ptr<ClientTable> prev, std::shared_ptr<RowSequence> removed_rows, std::shared_ptr<ClientTable> after_removes, std::shared_ptr<RowSequence> added_rows, std::shared_ptr<ClientTable> after_adds, std::vector<std::shared_ptr<RowSequence>> modified_rows, std::shared_ptr<ClientTable> after_modifies)

Constructor. Used internally.

TickingUpdate(const TickingUpdate &other)

Copy constructor.

TickingUpdate &operator=(const TickingUpdate &other)

Assignment operator.

TickingUpdate(TickingUpdate &&other) noexcept

Move constructor.

TickingUpdate &operator=(TickingUpdate &&other) noexcept

Move assignment operator.

~TickingUpdate()

Destructor.

inline const std::shared_ptr<ClientTable> &Prev() const

A snapshot of the table before any of the changes in this cycle were applied.

inline const std::shared_ptr<ClientTable> &BeforeRemoves() const

A snapshot of the table before any rows were removed in this cycle.

inline const std::shared_ptr<RowSequence> &RemovedRows() const

A RowSequence indicating the indexes of the rows (if any) that were removed in this cycle.

inline const std::shared_ptr<ClientTable> &AfterRemoves() const

A snapshot of the table after the rows (if any) were removed in this cycle. If no rows were removed, then this pointer will compare equal to BeforeRemoves().

inline const std::shared_ptr<ClientTable> &BeforeAdds() const

A snapshot of the table before any rows were added in this cycle.

inline const std::shared_ptr<RowSequence> &AddedRows() const

A RowSequence indicating the indexes of the rows (if any) that were added in this cycle.

inline const std::shared_ptr<ClientTable> &AfterAdds() const

A snapshot of the table after rows (if any) were added in this cycle. If no rows were added, then this pointer will compare equal to BeforeAdds().

inline const std::shared_ptr<ClientTable> &BeforeModifies() const

A snapshot of the table before cells were modified in this cycle.

inline const std::vector<std::shared_ptr<RowSequence>> &ModifiedRows() const

A vector of RowSequences which represents, for each column in the table, the indexes of the rows (if any) of the given column that were modified in this cycle.

inline const std::shared_ptr<RowSequence> &AllModifiedRows() const

A RowSequence which represents the union of the RowSequences returned By ModifiedRows().

inline const std::shared_ptr<ClientTable> &AfterModifies() const

A snapshot of the table after cells (if any) were modified in this cycle.

inline const std::shared_ptr<ClientTable> &Current() const

A snapshot of the table after all of the changes in this cycle were applied.