Reading and Writing Tables
While most use cases are satisfied using the db
interfaces, on occasion you may need to read or write individual tables from disk. Tables written this way can only be read back as individual tables and are not accessible from db.liveTable()
or db.historicalTable()
.
Formats
Core+ supports two table formats: Apache Parquet and Deephaven.
The Apache Parquet format is a standard format supported by many other tools and libraries. It supports compression and has advanced metadata that tools can use to make access fast. Unless you have a specific need for the additional append or read/write performance provided by the Deephaven format, this is the preferred static data format.
The Deephaven format was developed for fast columnar storage before Parquet was ubiquitous. Deephaven's format maximizes parallelism, allows arbitrary random reads, and is optimized for fast live append operations. It is very good at fast, parallel reading and writing. It can be efficiently appended to, but does not support any form of compression and is not interoperable with other standard tools. Only choose this format if you expect to append to the table very frequently or specifically require the additional read/write performance.
Parquet API
Reading and writing tables in Parquet format can be done directly through the Deephaven Core interfaces, which support both reading and writing single tables or a directory hierarchy of tables.
Deephaven API
Reading and writing Deephaven tables is done with the io.deephaven.enterprise.table.EnterpriseTableTools
in Groovy or the deephaven_enterprise.table_tools
module in Python.
For example:
import io.deephaven.enterprise.table.EnterpriseTableTools
// Create some data and write a table to disk in Deephaven format
grades = newTable(
stringCol("Name", "Ashley", "Jeff", "Rita", "Zach"),
intCol("Test1", 92, 78, 87, 74),
intCol("Test2", 94, 88, 81, 70),
intCol("Average", 93, 83, 84, 72),
doubleCol("GPA", 3.9, 2.9, 3.0, 1.8)
)
EnterpriseTableTools.writeTable("/tmp/grades")
// Now read it back
grades_read = EnterpriseTableTools.readTable("/tmp/grades")
from deephaven import new_table
from deephaven.column import int_col, double_col, string_col
import deephaven_enterprise.table_tools
grades = new_table([
string_col("Name", ["Ashley", "Jeff", "Rita", "Zach"]),
int_col("Test1", [92, 78, 87, 74]),
int_col("Test2", [94, 88, 81, 70]),
int_col("Average", [93, 83, 84, 72]),
double_col("GPA", [3.9, 2.9, 3.0, 1.8])
])
table_tools.write_table(grades, "/tmp/grades")
grades_read = table_tools.read_table("/tmp/grades")
Logging system tables from Core+
Core+ workers can log Table objects to a System table using the io.deephaven.enterprise.database.SystemTableLogger
class in Groovy or the system_table_logger
module in Python. As always, logging to a system table requires that a schema be defined in advance for the table.
You must specify what column partition to write to. This can be a fixed column partition or the current date (at the time the row was written; data is not introspected for a Timestamp).
The default behavior is to write via the Log Aggregator Service (LAS), but you can also write via binary logs. Because no code generation or listener versioning is performed, you must write columns in the format the listener expects. Logging tables for a non-default ZoneId (timezone) may only be done via direct binary logging, and cannot be done via LAS.
Row-by-row logging is not yet supported by Core+ workers. Existing binary loggers cannot be executed in the context of a Core+ worker because they reference classes that are shadowed (renamed). If row-level logging is required, you must use io.deephaven.shadow.enterprise.com.illumon.iris.binarystore.BinaryStoreWriterV2
directly.
Logging complex types
If you want to log complex types such as arrays or custom objects, you may use one of the built-in codecs or implement your own codec by extending the io.deephaven.util.codec.ObjectCodec
class and placing the compiled jar file on both the Enterprise and Core+ classpaths. Deephaven provides codecs for several common types:
Data Type | Codec class | Description |
---|---|---|
byte[] | io.deephaven.enterprise.codec.ByteArrayCodec | Encodes a byte[] |
char[] | io.deephaven.enterprise.codec.CharArrayCodec | Encodes a char[] |
short[] | io.deephaven.enterprise.codec.ShortArrayCodec | Encodes a short[] |
int[] | io.deephaven.enterprise.codec.IntArrayCodec | Encodes a int[] |
long[] | io.deephaven.enterprise.codec.LongArrayCodec | Encodes a long[] |
float[] | io.deephaven.enterprise.codec.FloatArrayCodec | Encodes a float[] |
double[] | io.deephaven.enterprise.codec.ByteArrayCodec | Encodes a byte[] |
String[] | io.deephaven.enterprise.codec.StringArrayCodec | Encodes a String[] |
To use codecs, the Schema for your table must define the codec used for each column. For example, the following schema defines a table Example.Data that contains a plain integer column, two array columns, and a custom object column.
<Table name="Data" namespace="Example" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
<Partitions keyFormula="${autobalance_by_first_grouping_column}" />
<Column name="Date" dataType="String" columnType="Partitioning" />
<Column name="IntCol" dataType="int"/>
<Column name="Doubles" dataType="double[]" objectCodec="io.deephaven.enterprise.codec.DoubleArrayCodec"/>
<Column name="Strings" dataType="String[]" objectCodec="io.deephaven.enterprise.codec.StringArrayCodec"/>
<Column name="CustomObjects" dataType="com.mycompany.Custom" objectCodec="com.mycompany.CustomCodec"/>
<Listener logFormat="1" listenerPackage="com.mycompany" listenerClass="ExampleDataFormat1Listener">
<ListenerImports>
import io.deephaven.enterprise.codec.DoubleArrayCodec;
import io.deephaven.enterprise.codec.StringArrayCodec;
import com.mycompany.CustomCodec;
</ListenerImports>
<Column name="Date" intradayType="none" />
<Column name="IntCol"/>
<Column name="Doubles" intradayType="Blob" dbSetter="DoubleArrayCodec.decodeStatic(Doubles)"/>
<Column name="Strings" intradayType="Blob" dbSetter="StringArrayCodec.decodeStatic(Strings)"/>
<Column name="CustomObjects" intradayType="Blob" dbSetter="ThingCodec.decodeStatic(CustomObjects)" />
</Listener>
</Table>
The code below can be used to write to the system table.
import io.deephaven.enterprise.database.SystemTableLogger
import io.deephaven.enterprise.codec.DoubleArrayCodec
import io.deephaven.enterprise.codec.StringArrayCodec
import com.mycompany.CustomCodec
opts = SystemTableLogger.newOptionsBuilder()
.currentDateColumnPartition(true)
.putColumnCodecs("Doubles", new DoubleArrayCodec())
.putColumnCodecs("Strings", new StringArrayCodec())
.putColumnCodecs("CustomObjects", new CustomCodec())
.build()
// After creating an Options structure, you can then log the current table:
SystemTableLogger.logTable(db, "Example", "Data", tableToLog, opts)
// When logging incrementally, a Closeable is returned. You must retain this object to ensure liveness. Call `close()` to stop logging and release resources.
lh=SystemTableLogger.logTableIncremental(db, "Example", "Data", tableToLog, opts)
The Python version does not use options but named arguments. If you specify None
for the column partition, the current date is used. The system_table_logger module provides methods for creating codecs for commonly used types, and custom codecs can be set using the system_table_logger.codec()
method. See the Pydoc for more details.
from deephaven_enterprise import system_table_logger as stl
stl.log_table("Example", "Data", table_to_log, columnPartition=None, \
codecs={"Doubles" : stl.double_array_codec(), \
"Strings" : stl.string_array_codec(), \
"CustomObjects" : stl.codec("com.mycompany.CustomCodec")})
Similarly, if you call log_table_incremental
from Python, you must close the returned object (or use it as a context manager in a with
statement).