Write data to blink tables with TablePublisher
This guide covers publishing data to blink tables programmatically using a table publisher. A table publisher creates a blink table from added tables. Blink tables only retain rows from the current update cycle. At the end of an update cycle, rows are removed and the data is gone forever.
A table publisher is a great way to programmatically ingest and publish data from external sources such as WebSockets and other live data sources. It writes data to an in-memory, real-time table like a DynamicTableWriter
. A table publisher, however, provides a newer and more refined API, as well as native support for blink tables.
Table publishers use a builder pattern to create the publisher and its associated blink table. From there, you can:
- Add data to the blink table with
add
. - (Optionally) Store data history in a downstream table.
- (Optionally) Shut the publisher down when finished.
More sophisticated uses will add steps but still follow the same basic formula.
Example: Getting started
The following example creates a table with three columns (X
, Y
, and Z
). The columns contain randomly generated integers and doubles.
import io.deephaven.csv.util.MutableBoolean
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.engine.table.TableDefinition
import io.deephaven.stream.TablePublisher
definition = TableDefinition.of(
ColumnDefinition.ofInt("X"),
ColumnDefinition.ofDouble("Y")
)
shutDown = {println "Finished."}
onShutdown = new MutableBoolean()
myPublisher = TablePublisher.of("My Publisher", definition, null, shutDown)
myTable = myPublisher.table()
- myTable
Add data to the blink table by calling the add
method.
myPublisher.add(emptyTable(5).update("X = randomInt(0, 10)", "Y = randomDouble(0.0, 100.0)"))
The table publisher can be shut down by calling publishFailure
.
myPublisher.publishFailure(new RuntimeException("User shut down."))
Example: threading
The following example adds new data to the publisher with emptyTable
every second for 5 seconds in a separate thread. Note how the current execution context is captured and used to add data to the publisher. Attempting to perform table operations in a separate thread without specifying an execution context will raise an exception.
import io.deephaven.engine.context.ExecutionContext
import io.deephaven.csv.util.MutableBoolean
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.engine.table.TableDefinition
import io.deephaven.stream.TablePublisher
import io.deephaven.util.SafeCloseable
definition = TableDefinition.of(
ColumnDefinition.ofInt("X"),
ColumnDefinition.ofDouble("Y")
)
shutDown = { -> println "Finished."}
onShutdown = new MutableBoolean()
myPublisher = TablePublisher.of("My Publisher", definition, null, shutDown)
myTable = myPublisher.table()
defaultCtx = ExecutionContext.getContext()
myFunc = { ->
try (SafeCloseable ignored = defaultCtx.open()) {
Random rand = new Random()
for (int i = 0; i < 5; ++i) {
nRows = rand.nextInt(5) + 5
myPublisher.add(emptyTable(nRows).update("X = randomInt(0, 10)", "Y = randomDouble(0.0, 100.0)"))
sleep(1000)
}
return
}
}
thread = new Thread(myFunc).start()
- myTable
Data history
Table publishers create blink tables. Blink tables do not store any data history - data is gone forever at the start of a new update cycle. In most use cases, you wil want to store some or all of the new rows written during previous update cycles. There are two ways to do this:
- Store some data history by creating a downstream ring table.
- Store all data history by creating a downstream append-only table.
See the table types user guide for more information on these table types, including which one is best suited for your application.
To show the storage of data history, we will extend the threading example by creating a downstream ring table and append-only table.
import io.deephaven.engine.table.impl.sources.ring.RingTableTools
import io.deephaven.engine.table.impl.BlinkTableTools
import io.deephaven.engine.context.ExecutionContext
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.engine.table.TableDefinition
import io.deephaven.csv.util.MutableBoolean
import io.deephaven.stream.TablePublisher
import io.deephaven.util.SafeCloseable
definition = TableDefinition.of(
ColumnDefinition.ofInt('X'),
ColumnDefinition.ofDouble('Y')
)
shutDown = { -> println 'Finished.'}
onShutdown = new MutableBoolean()
myPublisher = TablePublisher.of('My Publisher', definition, null, shutDown)
myTable = myPublisher.table()
defaultCtx = ExecutionContext.getContext()
myFunc = { ->
try (SafeCloseable ignored = defaultCtx.open()) {
Random rand = new Random()
for (int i = 0; i < 5; ++i) {
nRows = rand.nextInt(5) + 5
myPublisher.add(emptyTable(nRows).update('X = randomInt(0, 10)', 'Y = randomDouble(0.0, 100.0)'))
sleep(1000)
}
return
}
}
thread = new Thread(myFunc).start()
myRingTable = RingTableTools.of(myTable, 15, true)
myAppendOnlyTable = BlinkTableTools.blinkToAppendOnly(myTable)