Subscribing to Ticking Tables¶
Description¶
Some C++ applications need to maintain a local copy of a table that stays in sync with the server side. This is useful when your code is using custom libraries or other algorithms that are more natural to execute on the client rather than the server.
To support this functionality,
TableHandle
provides the
Subscribe
method. When this method is invoked, the Deephaven client library will start maintaining an internal
local copy of the table, keeping it up to date by processing periodic low-level update messages from
the server. Furthermore, the client library will provide a digested form of these updates to
subscribing code in a user-friendly form. The client library will continue to maintain this
subscription until the user calls
Unsubscribe
.
A simple example is:
TableHandle my_ticking_data = manager.FetchTable("MyTickingData");
auto sub_handle = my_ticking_data.Subscribe(std::make_shared<MyCallback>());
// Do all your processing...
// Then when all done, unsubscribe...
my_ticking_data.Unsubscribe(std::move(sub_handle));
The user-supplied MyCallback
is a class deriving from
TickingCallback
which overrides two methods:
When table data changes on the server side, those changes are batched up and periodically
transmitted to the client (by default, at a rate of about once per second). First, the client
library processes these low-level messages and applies them to its internal copy of the table. Then,
it constructs a user-friendly
TickingUpdate
object and invokes the user’s
OnTick
callback, passing it the
TickingUpdate
object.
A skeleton of a user-defined callback object looks like this:
class MyCallback final : public deephaven::dhcore::ticking::TickingCallback {
public:
void OnTick(deephaven::dhcore::ticking::TickingUpdate update) final {
// handle the update message
}
void OnFailure(std::exception_ptr ep) final {
// handle the failure
}
};
Structure of updates from the server¶
Synchronization between the server and client is performed via the Barrage protocol. The low-level details of this protocol are outside the scope of this document. However, at a high level, the purpose of the protocol is to efficiently transmit differences between table versions as a table changes over time. Those differences include:
removed rows
shifts (note 1)
added rows
modified cells (note 2)
Notes:
1. shifts are a way to express the renumbering (but not reordering) of internal row keys. In this
version of the client, we do not expose internal row keys to the caller. So you will not see shifts
represented in the
TickingUpdate
class in this version of the client.
2. In the above we explicitly refer to modified cells rather than modified rows, because
when a row is modified, typically only some cells within that row change but others stay the same.
For the sake of efficiency, the Barrage protocol allows the server to specify the specific
cells that changed within a row. These modifications are represented on a per-column basis. That is,
for each column, the library will indicate (via a
RowSequence
)
which rows of that column were modified.
The TickingUpdate class¶
The
TickingUpdate
class represents the changes that have happened to the table since the last tick. It contains
snapshots
(Prev
and
Current
)
of the table at the start and end of the entire update operation,
as well as intermediate snapshots
before and after the remove operation,
before and after the add operation, and
before and after the modify operation.
It also contains
RowSequence
values representing the positions of the removed, added, and modified items.
For some callers, the per-update
Prev
and
Current
table snapshots suffice for their needs.
These snapshots tell the caller how the table looked before
the update and after the update, respectively. Other callers will need more precise
information: exactly what rows were removed, added, and modified. These callers can use the
per-operation snapshots.
The per-update snapshots are:
Prev
- snapshot of the table before any of this cycle’s updates were applied.Current
- snapshot of the table after all of this cycle’s updates were applied.
The more fine-grained per-operation snaphots are:
BeforeRemoves
- snapshot of the table as it appeared before the remove operationAfterRemoves
- snapshot of the table as it appeared after the remove operationBeforeAdds
- snapshot of the table as it appeared before the add operationAfterAdds
- snapshot of the table as it appeared after the add operationBeforeModifies
- snapshot of the table as it appeared before the modify operationAfterModifies
- snapshot of the table as it appeared after the modify operation
Some of these snapshots are duplicative: For example, due to the order in which changes are applied
internally, it happens to be the case that
AfterRemoves
and
BeforeAdds
refer to exactly the same snapshot. We provide these extra snapshots for the
programmer’s convenience and intuition.
The library also takes pains to coalesce snapshots. For example, if no removes happen
in a given update, then the
BeforeRemoves
pointer will compare equal to the
AfterRemoves
pointer.
Some readers may be concerned about the cost of maintaining all these snapshots. Internally, the snapshots are represented by copy-on-write data structures that take pains to do a lot of structural sharing. Broadly speaking, it is not expensive to have two snapshots of a table when most of the data is unchanged between the two tables. The specific implementation of this snapshotting data structure comes from the Immer Persistent and Immutable Data Structures project.
The
TickingUpdate
object also provides
RowSequence
objects indicating which specific rows were changed. The provided
RowSequence
objects are:
RemovedRows
- indexes of rows removed from theBeforeRemoves
snapshot to formAfterRemoves
.AddedRows
- indexes of rows added to theBeforeAdds
snapshot to formAfterAdds
.ModifiedRows
- astd::vector
ofRowSequence
shared_ptrs, which represents the modified data on a per-column basis. Each element of the vector is aRowSequence
shared_ptr representing the corresponding column. ThatRowSequence
provides the indexes of rows that were modified in the corresponding column ofBeforeModifies
to form the corresponding column inAfterModifies
.
Declarations¶
-
class ClientTable¶
An abstract base class representing a Deephaven table. This is used for example in TickingUpdate to provide table snapshots to a caller who has subscribed to ticking tables.
Subclassed by deephaven::client::arrowutil::ArrowClientTable
Public Types
-
using ColumnSource = deephaven::dhcore::column::ColumnSource¶
Alias.
-
using RowSequence = deephaven::dhcore::container::RowSequence¶
Alias.
Public Functions
-
ClientTable() = default¶
Constructor.
-
virtual ~ClientTable() = default¶
Destructor.
-
virtual std::shared_ptr<RowSequence> GetRowSequence() const = 0¶
Get the RowSequence (in position space) that underlies this Table.
-
virtual std::shared_ptr<ColumnSource> GetColumn(size_t column_index) const = 0¶
Gets a ColumnSource from the ClientTable by index.
- Parameters:
column_index – Must be in the half-open interval [0, NumColumns).
-
std::shared_ptr<ColumnSource> GetColumn(std::string_view name, bool strict) const¶
Gets a ColumnSource from the ClientTable by name. ‘strict’ controls whether the method must succeed.
- Parameters:
name – The name of the column.
strict – Whether the method must succeed.
- Returns:
If ‘name’ was found, returns the ColumnSource. If ‘name’ was not found and ‘strict’ is true, throws an exception. If ‘name’ was not found and ‘strict’ is false, returns nullptr.
-
std::optional<size_t> GetColumnIndex(std::string_view name, bool strict) const¶
Gets the index of a ColumnSource from the ClientTable by name. ‘strict’ controls whether the method must succeed.
- Parameters:
name – The name of the column.
strict – Whether the method must succeed.
- Returns:
If ‘name’ was found, returns the index of the ColumnSource. If ‘name’ was not found and ‘strict’ is true, throws an exception. If ‘name’ was not found and ‘strict’ is false, returns an empty optional.
-
virtual size_t NumRows() const = 0¶
Number of rows in the clienttable.
-
virtual size_t NumColumns() const = 0¶
Number of columns in the clienttable.
-
virtual std::shared_ptr<deephaven::dhcore::clienttable::Schema> Schema() const = 0¶
The clienttable schema.
-
internal::TableStreamAdaptor Stream(bool want_headers, bool want_row_numbers) const¶
Creates an ‘ostream adaptor’ to use when printing the clienttable. Example usage: std::cout << myTable.Stream(true, false).
Creates an ‘ostream adaptor’ to use when printing the clienttable. Example usage: std::cout << myTable.Stream(true, false, rowSeq).
Creates an ‘ostream adaptor’ to use when printing the clienttable. Example usage: std::cout << myTable.Stream(true, false, rowSequences).
-
std::string ToString(bool want_headers, bool want_row_numbers) const¶
For debugging and demos.
For debugging and demos.
For debugging and demos.
-
using ColumnSource = deephaven::dhcore::column::ColumnSource¶
-
class Schema¶
The table schema that goes along with a Table class. This Schema object tells you about the names and data types of the table columns.
-
class TickingCallback¶
Abstract base class used to define the caller’s ticking callback object. This object is passed to the TableHandle::Subscribe() method.
Subclassed by WrappedTickingCallback
Public Functions
-
virtual void OnTick(TickingUpdate update) = 0¶
Invoked on each update to the subscription.
-
virtual void OnFailure(std::exception_ptr eptr) = 0¶
Invoked if there is an error involving the subscription.
-
virtual void OnTick(TickingUpdate update) = 0¶
-
class TickingUpdate¶
An update message (passed to client code via TickingCallback::OnTick()) which describes the changes (removes, adds, modifies) that transform the previous version of the table to the new version. This class is threadsafe and can be kept around for an arbitrary amount of time, though this will consume some memory. The underlying snapshots share a common substructure, so the amount of memory they consumed is roughly proportional to the amount of “new” data in that snapshot.
Public Types
-
using RowSequence = deephaven::dhcore::container::RowSequence¶
Alias.
-
using ClientTable = deephaven::dhcore::clienttable::ClientTable¶
Alias.
Public Functions
-
TickingUpdate()¶
Default constructor.
Constructor. Used internally.
-
TickingUpdate(const TickingUpdate &other)¶
Copy constructor.
-
TickingUpdate &operator=(const TickingUpdate &other)¶
Assignment operator.
-
TickingUpdate(TickingUpdate &&other) noexcept¶
Move constructor.
-
TickingUpdate &operator=(TickingUpdate &&other) noexcept¶
Move assignment operator.
-
~TickingUpdate()¶
Destructor.
-
inline const std::shared_ptr<ClientTable> &Prev() const¶
A snapshot of the table before any of the changes in this cycle were applied.
-
inline const std::shared_ptr<ClientTable> &BeforeRemoves() const¶
A snapshot of the table before any rows were removed in this cycle.
-
inline const std::shared_ptr<RowSequence> &RemovedRows() const¶
A RowSequence indicating the indexes of the rows (if any) that were removed in this cycle.
-
inline const std::shared_ptr<ClientTable> &AfterRemoves() const¶
A snapshot of the table after the rows (if any) were removed in this cycle. If no rows were removed, then this pointer will compare equal to BeforeRemoves().
-
inline const std::shared_ptr<ClientTable> &BeforeAdds() const¶
A snapshot of the table before any rows were added in this cycle.
-
inline const std::shared_ptr<RowSequence> &AddedRows() const¶
A RowSequence indicating the indexes of the rows (if any) that were added in this cycle.
-
inline const std::shared_ptr<ClientTable> &AfterAdds() const¶
A snapshot of the table after rows (if any) were added in this cycle. If no rows were added, then this pointer will compare equal to BeforeAdds().
-
inline const std::shared_ptr<ClientTable> &BeforeModifies() const¶
A snapshot of the table before cells were modified in this cycle.
-
inline const std::vector<std::shared_ptr<RowSequence>> &ModifiedRows() const¶
A vector of RowSequences which represents, for each column in the table, the indexes of the rows (if any) of the given column that were modified in this cycle.
-
inline const std::shared_ptr<RowSequence> &AllModifiedRows() const¶
A RowSequence which represents the union of the RowSequences returned By ModifiedRows().
-
inline const std::shared_ptr<ClientTable> &AfterModifies() const¶
A snapshot of the table after cells (if any) were modified in this cycle.
-
inline const std::shared_ptr<ClientTable> &Current() const¶
A snapshot of the table after all of the changes in this cycle were applied.
-
using RowSequence = deephaven::dhcore::container::RowSequence¶