Importing Parquet files into the Database

This guide shows you how to import externally-generated Parquet files into Deephaven. To learn how to use ParquetTools, see Importing Data without schemas.

You can add externally-generated Parquet files to appropriate locations under the /db/Systems directory (the root of the Deephaven historical database) so that they are accessible as standard Deephaven tables using db.t("namespace", "tableName"). There are essentially four steps to this:

  • Create a Deephaven schema for the table. This defines the namespace, table name, column names, and column types.
  • Create the appropriate namespace and partition directories under /db/Systems.
  • Copy or link the appropriate Parquet file into place. (To support grouping or data indexes, you can use the "merge" process for this step.)
  • Run the metadata indexer to update the database's index of available partitions. (This is unnecessary when the merge process is used.)

Create the Schema

While you can create a schema manually or via the schema editor in the UI, currently the easiest way to create a schema from an existing Parquet file is by using the following Groovy script in a Deephaven console. This will automatically read a Parquet file, generate an appropriate Deephaven schema, and add it to the centralized Deephaven configuration.

import com.illumon.iris.db.schema.xml.SchemaXmlParser
import com.illumon.iris.db.schema.xml.SchemaXmlUtil
import com.illumon.iris.db.v2.locations.parquet.ParquetTools;
import com.illumon.dataobjects.ColumnDefinition;
import com.illumon.iris.db.tables.TableDefinition;
import com.illumon.iris.db.schema.xml.SchemaXmlFactory;
import com.illumon.iris.db.schema.NamespaceSet;
import com.illumon.iris.db.schema.SchemaServiceFactory
import com.illumon.iris.utils.SchemaCreatorUtils

NAMESPACE = "MyNamespace"
TABLE_NAME = "MyTable"
FILE_PATH = "/path/to/my/parquet/files/myfile.parquet"

schemaService = SchemaServiceFactory.getDefault();

sourceTable = ParquetTools.readTable(FILE_PATH);
if (sourceTable.hasColumns("Date")) {
    sourceTable = sourceTable.dropColumns("Date")
}

// This section adds a Date String column as a partitioning column
cdef = sourceTable.getDefinition().getColumns();
newColumns = new ColumnDefinition[cdef.size() + 1];
for (int i = 0; i < cdef.size(); i++) {
    newColumns[i + 1] = cdef[i];
}
newColumns[0] = new ColumnDefinition<>("Date", String.class, 4).withPartitioning();
newTDef = new TableDefinition(newColumns);

// Set properties for the new table
newTDef.setNamespace(NAMESPACE);
newTDef.setName(TABLE_NAME);
newTDef.setStorageType(2);

// Storage Type IDs to use with .setStorageType()
//STORAGETYPE_NESTEDPARTITIONEDONDISK=2;
//STORAGETYPE_FRAGMENTEDONDISK=8;
//STORAGETYPE_HIERARCHICALONDISK=16;

schema = SchemaXmlFactory.getMutableSchema(
        newTDef,
        NamespaceSet.SYSTEM);

schemaXml = SchemaXmlFactory.getXmlSchema(newTDef, NamespaceSet.SYSTEM)

import com.illumon.iris.db.schema.xml.SchemaXmlUtil

println "New schema:"
println SchemaXmlUtil.makeSchemaXmlOutputter().outputString(schemaXml.getElement())


if (!schemaService.containsNamespace(NAMESPACE)) {
    // If the namespace does not already exist, add it:
    println "Adding namespace $NAMESPACE."
    schemaService.createNamespace(NamespaceSet.SYSTEM, NAMESPACE);
} else {
    println "Namespace $NAMESPACE exists."
}

// If the table does not yet exist, use addSchema;
// to replace an existing schema use updateSchema

schemaService.addSchema(schema);
//schemaService.updateSchema(schema);

Warning

In the code examble above, there is an optional line of code updateSchema(schema) that allows you to modify the schema automatically. If you use this option, be aware that if you change the type of an existing column you may lose the ability to read data that has been previously written to disk.

Create the Database directories

Historical data in Deephaven is stored under /db/Systems/<namespace>/Partitions/<storage partition ID>/<date>/<table name>/table.parquet. When adding a new table, you should create these directories up to the "storage partition ID" level. There is also a WritablePartitions directory containing links to partition IDs that the merge process may write to. Below is an example script to create the required directories for the "MyNamespace" namespace:

# Create the Partitions and WritablePartitions directories for the namespace:
sudo -u dbmerge mkdir -vp /db/Systems/MyNamespace/{Writable,}Partitions/

# Create the partition:
sudo -u dbmerge mkdir -v /db/Systems/MyNamespace/Partitions/0
# This can also be a link to external storage (such as S3):
sudo -u dbmerge ln -vs /path/to/s3/storage/mynamespace/0 /db/Systems/MyNamespace/Partitions/0

# Create a link in WritablePartitions to the Partitions directory:
cd /db/Systems/MyNamespace/WritablePartitions
sudo -u dbmerge ln -vs ../Partitions/0 ./0

Next, for each date for which you have Parquet data, create two final directories corresponding to the date and the table name:

sudo -u dbmerge mkdir -v /db/Systems/MyNamespace/Partitions/0/2023-04-28/MyTable

Then you can move/copy/link the original data into the table.parquet file in the new directory:

sudo -u dbmerge mv /path/to/original/file.parquet /db/Systems/MyNamespace/Partitions/0/2023-04-28/MyTable/table.parquet

Run the Metadata Indexer

If you copied or linked table.parquet into place manually (as in the example above — as opposed to using a merge job), then you should also run the metadata indexer so that queries can discover the new location:

sudo /usr/illumon/latest/bin/iris metadata_indexer System MyNamespace MyTable

And that's it! After running the metadata indexer, you should be able to start a new code studio in the Deephaven UI and access your new table with:

myTable = db.t("MyNamespace", "MyTable")