Table types to support streaming
There are several variations of Deephaven tables: append-only, add-only, blink, and ring. These specializations are important for optimization, especially when working with streaming data.
Append-only
Append-only tables allow news rows to be added to the table - rows cannots be shifted, modified or removed. Rows are always added to the end of the table as new data comes in. Rows from all previous update cycles are kept and are immutable. Append-only is the default table type outside of Kafka stream ingestion.
Append-only tables are useful when your use case needs a “full history” of every record ingested from the stream. However, because these tables keep all their rows, table size and memory consumption grow without constraint.
Add-only
Add-only tables also allow new rows to be added to the table. It is quite common that data may be added to the middle of a table, such as in a live table with muliple partitions. However, when using special variables like i
or ii
, inconsistent results may occur.
Blink
Blink tables keep only the set of rows received during the current update cycle. Users can create blink tables when ingesting Kafka streams, creating time tables, or using Table Publishers.
Blink tables form the basis for more advanced use cases when used in combination with stateful table aggregations like last_by. For streaming tables without any downstream table operations, aggregations, or listeners, the new messages will appear as rows in the table for one update graph cycle, then disappear on the next update graph cycle. Visually, if you look at a blink table rendered in the UI, you will see that the rows show up and stay for one second (the default Barrage update propagation period), and then disappear to be replaced by new rows. Blink tables are the default table type for Kafka ingestion within Deephaven because of their low memory use. They are most useful if you want to aggregate your data, derive other tables, or use programmatic listeners to react to data.
Aggregation operations such as agg_by
and count_by
operate with special semantics on blink tables, allowing the result to aggregate over the entire observed stream of rows from the time the operation is initiated. That means, for example, that a sum_by
on a blink table will contain the result sums for each aggregation group over all observed rows since the sum_by
was applied, rather than just the sums for the current update cycle. This allows for aggregations over the full history of a stream to be performed with greatly reduced memory costs when compared to the alternative strategy of holding the entirety of the stream as an in-memory table.
Most operations on blink tables behave exactly as they do on other tables (see the exclusions below); that is, add and remove operations are processed as normal. For example, select
on a blink table will have only the newly added rows from the current update cycle.
Because Deephaven does not need to keep all the history of rows read from the input stream in memory, table operations on blink tables may require less memory.
Unsupported operations
Attempting to use the following operations on a blink table will raise an error:
group_by
partition_by
partition_agg_by
head_pct
tail_pct
slice
slice_pct
agg_by
if eithergroup
orpartition
is used.rollup
ifincludeConstituents=true
.tree
To disable blink table semantics, use removeBlink
, which returns a child table that is identical to the parent blink table in every way, but is no longer marked for special blink table semantics. The resulting table will still exhibit the “blink” table update pattern, removing all previous rows on each cycle, and thus only containing “new” rows.
Ring
Ring tables retain the latest N
number of rows from the parent table or stream. If an update cycle causes the table to grow to more than N
rows, the oldest rows are removed until only N
remain. Ring tables retain the latest N
number of rows from the parent table or stream. If an update cycle causes the table to grow to more than N
rows, the oldest rows are removed until only N
remain. Deephaven expects to handle blink or add-only tables, which means that any deleted rows in the parent table will be ignored, and any updated rows will raise an exception. Ring tables are much less memory-intensive than append-only tables. While append-only tables can grow infinitely in size, ring tables only hold on to as many rows as the user tells them to - once a row passes out of that range, it is eligible to be deleted from memory.
A ring table is semantically the same as any other streaming table, meaning it does not get special treatment in aggregations the way blink tables do. However, operations use less memory because ring tables dispose of old data. Ring tables are mostly used with blink tables, which do not retain their own data for more than an update cycle. For example, a ring table of a blink time table could preserve the last 24 hours of data. For example, a blink table only stores rows from the current update cycle, but could be converted to a ring table that preserves the last 5000 rows instead.
A ring table can be created from a blink table or an append-only table. In this example, we'll create a ring table with a 3-row capacity from a simple append-only time table.
from deephaven import time_table, ring_table
source = time_table("PT00:00:01")
result = ring_table(parent=source, capacity=3)
A more common use case is to create a ring table from a blink table to preserve some data history. The following example creates a ring table from a blink time table. In source
, old data is removed from memory as soon as new data enters the table. In result
, 5 rows are kept, which preserves 4 more rows than source
.
from deephaven import time_table, ring_table
source = time_table(period="PT00:00:01", start_time=None, blink_table=True)
result = ring_table(parent=source, capacity=5)
The following example creates a ring table from a time table that starts with 5 rows. The initialize
argument is not set, and so is True
by default. This means the ring table initially starts with all 5 rows populated.
from deephaven import empty_table, time_table, ring_table, merge
static_source = empty_table(5).update(["X = i"])
dynamic_source = (
time_table("PT00:00:01").update(["X = i + 5"]).drop_columns(["Timestamp"])
)
source = merge([static_source, dynamic_source])
result = ring_table(parent=source, capacity=5)
The following example is identical to the one above, except initialize
is set to False
. Thus, when the query is first run, result
is empty.
from deephaven import empty_table, time_table, ring_table, merge
static_source = empty_table(5).update(["X = i"])
dynamic_source = (
time_table("PT00:00:01").update(["X = i + 5"]).drop_columns(["Timestamp"])
)
source = merge([static_source, dynamic_source])
result = ring_table(parent=source, capacity=5, initialize=False)