Importing Parquet files into the Database
This guide shows you how to import externally-generated Parquet files into Deephaven. To learn how to use ParquetTools
, see Importing Data without schemas.
You can add externally-generated Parquet files to appropriate locations under the /db/Systems
directory (the root of the Deephaven historical database) so that they are accessible as standard Deephaven tables using db.t("namespace", "tableName")
. There are essentially four steps to this:
- Create a Deephaven schema for the table. This defines the namespace, table name, column names, and column types.
- Create the appropriate namespace and partition directories under
/db/Systems
. - Copy or link the appropriate Parquet file into place. (To support grouping or data indexes, you can use the "merge" process for this step.)
- Run the metadata indexer to update the database's index of available partitions. (This is unnecessary when the merge process is used.)
Create the Schema
While you can create a schema manually or via the schema editor in the UI, currently the easiest way to create a schema from an existing Parquet file is by using the following Groovy script in a Deephaven console. This will automatically read a Parquet file, generate an appropriate Deephaven schema, and add it to the centralized Deephaven configuration.
import com.illumon.iris.db.schema.xml.SchemaXmlParser
import com.illumon.iris.db.schema.xml.SchemaXmlUtil
import com.illumon.iris.db.v2.locations.parquet.ParquetTools;
import com.illumon.dataobjects.ColumnDefinition;
import com.illumon.iris.db.tables.TableDefinition;
import com.illumon.iris.db.schema.xml.SchemaXmlFactory;
import com.illumon.iris.db.schema.NamespaceSet;
import com.illumon.iris.db.schema.SchemaServiceFactory
import com.illumon.iris.utils.SchemaCreatorUtils
NAMESPACE = "MyNamespace"
TABLE_NAME = "MyTable"
FILE_PATH = "/path/to/my/parquet/files/myfile.parquet"
schemaService = SchemaServiceFactory.getDefault();
sourceTable = ParquetTools.readTable(FILE_PATH);
if (sourceTable.hasColumns("Date")) {
sourceTable = sourceTable.dropColumns("Date")
}
// This section adds a Date String column as a partitioning column
cdef = sourceTable.getDefinition().getColumns();
newColumns = new ColumnDefinition[cdef.size() + 1];
for (int i = 0; i < cdef.size(); i++) {
newColumns[i + 1] = cdef[i];
}
newColumns[0] = new ColumnDefinition<>("Date", String.class, 4).withPartitioning();
newTDef = new TableDefinition(newColumns);
// Set properties for the new table
newTDef.setNamespace(NAMESPACE);
newTDef.setName(TABLE_NAME);
newTDef.setStorageType(2);
// Storage Type IDs to use with .setStorageType()
//STORAGETYPE_NESTEDPARTITIONEDONDISK=2;
//STORAGETYPE_FRAGMENTEDONDISK=8;
//STORAGETYPE_HIERARCHICALONDISK=16;
schema = SchemaXmlFactory.getMutableSchema(
newTDef,
NamespaceSet.SYSTEM);
schemaXml = SchemaXmlFactory.getXmlSchema(newTDef, NamespaceSet.SYSTEM)
import com.illumon.iris.db.schema.xml.SchemaXmlUtil
println "New schema:"
println SchemaXmlUtil.makeSchemaXmlOutputter().outputString(schemaXml.getElement())
if (!schemaService.containsNamespace(NAMESPACE)) {
// If the namespace does not already exist, add it:
println "Adding namespace $NAMESPACE."
schemaService.createNamespace(NamespaceSet.SYSTEM, NAMESPACE);
} else {
println "Namespace $NAMESPACE exists."
}
// If the table does not yet exist, use addSchema;
// to replace an existing schema use updateSchema
schemaService.addSchema(schema);
//schemaService.updateSchema(schema);
Warning
In the code examble above, there is an optional line of code updateSchema(schema)
that allows you to modify the schema automatically. If you use this option, be aware that if you change the type of an existing column you may lose the ability to read data that has been previously written to disk.
Create the Database directories
Historical data in Deephaven is stored under /db/Systems/<namespace>/Partitions/<storage partition ID>/<date>/<table name>/table.parquet
. When adding a new table, you should create these directories up to the "storage partition ID" level. There is also a WritablePartitions
directory containing links to partition IDs that the merge process may write to. Below is an example script to create the required directories for the "MyNamespace" namespace:
# Create the Partitions and WritablePartitions directories for the namespace:
sudo -u dbmerge mkdir -vp /db/Systems/MyNamespace/{Writable,}Partitions/
# Create the partition:
sudo -u dbmerge mkdir -v /db/Systems/MyNamespace/Partitions/0
# This can also be a link to external storage (such as S3):
sudo -u dbmerge ln -vs /path/to/s3/storage/mynamespace/0 /db/Systems/MyNamespace/Partitions/0
# Create a link in WritablePartitions to the Partitions directory:
cd /db/Systems/MyNamespace/WritablePartitions
sudo -u dbmerge ln -vs ../Partitions/0 ./0
Copy or link the data into place
Next, for each date for which you have Parquet data, create two final directories corresponding to the date and the table name:
sudo -u dbmerge mkdir -v /db/Systems/MyNamespace/Partitions/0/2023-04-28/MyTable
Then you can move/copy/link the original data into the table.parquet
file in the new directory:
sudo -u dbmerge mv /path/to/original/file.parquet /db/Systems/MyNamespace/Partitions/0/2023-04-28/MyTable/table.parquet
Run the Metadata Indexer
If you copied or linked table.parquet
into place manually (as in the example above — as opposed to using a merge job), then you should also run the metadata indexer so that queries can discover the new location:
sudo /usr/illumon/latest/bin/iris metadata_indexer System MyNamespace MyTable
And that's it! After running the metadata indexer, you should be able to start a new code studio in the Deephaven UI and access your new table with:
myTable = db.t("MyNamespace", "MyTable")