Supplementing historical data
In some cases it may be desirable to add historical data to previous dates without re-merging the data that already exists for these dates. Because Deephaven is an append-only database, supplemental data can extend existing data, but cannot replace it. An effective exception to this, though, is for tables that store data as point-in-time values; in this case, a lastBy
would be used to get the latest version of a row, and supplementing historical data could add newer versions to return from the lastBy
.
One example use case for supplementing historical data would be when a historical data set contains information for a set of symbols, and that set is to be expanded with some amount of backfill. In this case, the new data doesn't replace any existing data, but just adds more symbols for the historical dates.
See Table Storage and Merging Data for general background about how Deephaven stores historical data on disk.
For details on replacing individual columns of historical data, see Column Tools.
Importing and merging supplemental data
This section describes the process for adding data to existing historical partitions. This process is suitable for both Deephaven and Parquet format historical data.
In order to generate an additional data set that matches the layout of the historical data to be supplemented, it is easiest to use a separate namespace, with a table that uses a schema that is mostly a duplicate of the table to be supplemented. In this context, "mostly" means that the side-fill table must have the same names, types, and ordering of columns, and the same partitioning and grouping columns. However, the partitioning formula for the table can be different. Since the two tables are in different namespaces, they can (and should) have the same name. Using the same name for the two tables will simplify the process of moving the supplemental data into the storage of the main table.
If the data to be added is relatively small compared to the existing historical data (less than 10%), then it will generally be simplest to create the side-load namespace with a single internal partition.
In the illustration below, an existing historical table with ten internal partitions is being supplemented with data added to one internal partition in a separate namespace. Note that the internal partitions in the supplemental data's namespace should have names unique across the set of the two namespaces' internal partitions.
Important
If supplemental data is being added to provide newer "latest" values for rows, which will be selected using lastBy
, then the naming of the new internal partition(s) becomes significant. Unsorted presentation of rows from disk is in the lexicographical order of their internal partition name, so, for supplemented rows to be presented later to the lastBy
operator, they should be merged into internal partitions whose names sort after the original internal partitions of the permanent namespace. E.g. existing partitions 0 through 9, and new internal partition A, or A0, etc.
The overall process is:
- Import the data to be added to intraday for the side-fill table.
- Merge the side-fill table.
- Copy or move the partition from step 2 to a new internal partition for the table being supplemented. (If the base and side-fill tables have different names, then this step will also require renaming the level of the path just above the data.)
- Purge intraday data.
- Repeat 1 through 4 for each date that needs to be supplemented.
- Run the metadata indexer process to rebuild indexes for the base table namespace.
The metadata index from step 6 is a map of internal partitions and dates to accelerate the process of finding paths that are needed when querying across a table with many dates and internal partitions.
This process can be automated in a script that loops through a range of dates and uses the import, merge, and purge builder classes to process steps 1, 2, and 4. Step 3 can be automated to use Java or Python file system methods to execute directory move and rename operations.
Overall time to add data to a historical date via this process should be pretty close to the proportion of new data versus existing data, and the time that the original data took to process. For example, if the original import and merge process took 10 minutes, then adding 10% new data should take roughly 1 minute.
This process must be run in a merge worker, in order to have appropriate access to write new data to intraday and historical storage areas.
Note
Deephaven organizes internal partitions for a namespace by placing them in a Partitions
directory. A peer of this directory is WritablePartitions
.
- The query processor will look in the
Partitions
directory tree for data when reading. - The merge process will look for
WritableParititons
when merging.
Typically, WritablePartitions
subdirectories are linked to Partitions
subdirectories so that all partitions are writable. When adding side-fill partitions to existing historical data, it is recommended to not link them as writable, so that only side-fill operations use those additional internal partitions.
Important
Each time data in a particular historical partition (date) needs to be supplemented, a new set of one or more internal partitions will be needed; i.e, if a date in a namespace with internal partitions 0
to 9
is supplemented to add some symbols using a temporary namespace with an internal partition called 0_incr
, and, later, more data needs to be added to the same date, that new data will need to be merged into internal partitions with names other than the already used 0
to 9
and 0_incr
. Alternatively, the supplemental partitions can be freed for reuse by re-merging the supplemented dates as outlined in the next section.
This example script is usable as a Groovy script to:
- run in a merge worker (console or persistent query) to iterate through a range of dates,
- import supplemental batch data from CSV files,
- merge them to a temporary namespace,
- move the internal partition(s) to the permanent namespace,
- and purge the imported intraday data.
This script should work with minor changes in a Java class as well. As written, the script expects CSV files that are compressed with GZip, with all files in the /tmp
directory, and with the applicable date as part of the file name in the form yyyy-MM-dd
.
Note
There is a use_parquet
property, which defaults to false
, which must be set to true
when using the script with a table that is merged using Parquet format.
Click to see the full script.
// This script loads additional data from csv files to populate a range of existing historical dates with new information.
// One place this might be used is when a historical dataset contains information for a set of symbols and it becomes
// desirable to add more symbols to the historical set without remerging the existing data.
// This script expects to use a table_name that exists with the same schema in two namespaces (the target/main namespace
// and a temp namespace).
// The script will iterate through dates in the specified range, importing and merging in the temp namespace, and then
// moving merged data to the target namespace. After each date is processed, the intraday data in the temp namespace will
// be deleted. After all dates have been processed, the script will update the metadata index for the target table.
import com.illumon.iris.importers.util.CsvImport;
import com.illumon.iris.importers.ImportOutputMode;
import java.io.File;
import com.illumon.iris.db.v2.locations.local.LocalTablePathManager;
import com.illumon.iris.db.tables.databases.OnDiskDatabase;
import com.illumon.iris.db.v2.locations.TableType;
import java.util.List;
import java.util.ArrayList;
import java.util.EnumSet;
import com.fishlib.base.FileUtils;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import io.deephaven.configuration.IntradayControlImpl;
import com.illumon.iris.db.v2.locations.local.LocalMetadataIndexer;
import com.illumon.iris.db.v2.locations.TableType;
import com.illumon.iris.db.v2.locations.TableLookupKey;
import com.illumon.iris.db.v2.configuration.DataRoutingService;
import com.illumon.iris.db.v2.locations.local.LocalTableDataService;
import com.illumon.iris.importers.util.MergeData;
import java.util.*;
// Set values here to indicate table name, namespaces to use (main target and temp), historical data format, source file details, and range of dates to populate
final String table_name = "trades";
final String main_namespace = "taq";
final String temp_namespace = "temp_taq";
final String start_date = "2022-06-01";
final String end_date = "2022-08-31";
final String source_directory = "/tmp";
final String file_ending = ".gz";
final boolean use_parquet = false;
DBDateTime date = convertDateTime(start_date + "T12:00:00 NY");
final DBDateTime end = convertDateTime(end_date + "T12:00:00 NY");
// Check that sourcePath is a valid path
final File sourcePath = new File(source_directory);
if (!sourcePath.exists()) {
throw new RuntimeException("Source path: \"" + source_directory + "\" not found.");
}
// Get list of writable partitions for the temp namespace
final LocalTablePathManager locationManager = db instanceof OnDiskDatabase ? new LocalTablePathManager(((OnDiskDatabase) db).getRootDirectory()) : LocalTablePathManager.getDefaultInstance();
final File sourceMultidayRoot = locationManager.getWritablePartitionsRoot(temp_namespace, TableType.SYSTEM_PERMANENT);
final List<File> writablePartitions = new ArrayList<>();
writablePartitions.addAll(FileUtils.missingSafeListSubDirectories(sourceMultidayRoot));
// Verify that there is no overlap between writable partition names in the temp namespace and those in the target namespace
// note that this check only checks for conflicts of writable partitions; it is still possible to overwrite non-writable partitions in the permanent namespace with this process.
final File targetMultidayRoot = locationManager.getWritablePartitionsRoot(main_namespace, TableType.SYSTEM_PERMANENT);
final List<String> existingPartitionNames = new ArrayList<>();
for (final File file : FileUtils.missingSafeListSubDirectories(targetMultidayRoot)) {
existingPartitionNames.add(file.getName());
}
if (writablePartitions.size() == 0) {
throw new IllegalStateException("No writable partitions found for namespace: " + temp_namespace);
}
for (final File file : writablePartitions) {
if (existingPartitionNames.contains(file.getName())) {
throw new IllegalStateException("Partition name collision between temp and permanent namespaces for partition: " + file.getName());
}
}
final String targetPartitionsRoot = targetMultidayRoot.toString().replace("WritablePartitions","Partitions");
final String NO_SOURCE_FILES = "No source files found.";
// Iterate through dates, attempting to import, merge, purge, and move found files in the source directory with yyyy-mm-dd formatted date in the file name
while (date <= end) {
dateString = formatDateNy(date);
globString = "*" + dateString + "*" + file_ending;
int rows=0;
try {
rows = new CsvImport.Builder(temp_namespace,table_name)
.setSourceDirectory(source_directory)
.setSourceGlob(globString)
.setDestinationPartitions("localhost/" + dateString)
.setOutputMode(ImportOutputMode.REPLACE)
.build()
.run();
System.out.println("Imported " + rows + " rows for date: " + dateString);
} catch(IllegalStateException ex) {
if (NO_SOURCE_FILES.equals(ex.getMessage())) {
System.out.println("No source files found for date: " + dateString);
} else {
throw(ex);
}
}
if (rows > 0) {
MergeData.builder(db,temp_namespace,table_name)
.setPartitionColumnValue(dateString)
.setForce(true)
.setStorageFormat(use_parquet ? "Parquet" : "DeephavenV1")
.build()
.run()
System.out.println ("Merged data for date: " + dateString);
for (final File sourcePart : writablePartitions) {
// Move newly merged paths to permanent namespace.
// If internal partitions or the table name need to change during the move process, that can be done here.
final File destDatePath = Paths.get(targetPartitionsRoot, sourcePart.getName(), dateString).toFile();
if (!(destDatePath.exists())) {
if(!(destDatePath.mkdirs())) {
throw new UncheckedIOException("Failed to create destination partition directory(s): " + destDatePath.getPath());
}
}
final Path source = Paths.get(sourcePart.toString(), dateString, table_name);
final Path dest = Paths.get(targetPartitionsRoot, sourcePart.getName(), dateString, table_name);
Files.move(source, dest, StandardCopyOption.REPLACE_EXISTING);
}
System.out.println ("Moved " + writablePartitions.size() + " partitions for date: " + dateString);
// Delete the intraday data
IntradayControlImpl.truncateIntradayPartition(temp_namespace, table_name, dateString);
IntradayControlImpl.deleteIntradayPartition(temp_namespace, table_name, dateString);
System.out.println ("Deleted intraday data for date: " + dateString);
}
date = DBTimeUtils.plus(date, DAY);
}
// Update metadata index for the table to store locations of new internal partitions
LocalMetadataIndexer.updateIndex(
new LocalTableDataService("side-load", locationManager, EnumSet.of(TableType.SYSTEM_PERMANENT), false, DataRoutingService.NULL_PROPERTY_PROVIDER, false),
new TableLookupKey.Immutable(main_namespace, table_name, TableType.SYSTEM_PERMANENT));
Re-merging supplemented historical tables
Using the processes detailed above, the structure of supplemented tables on disk will be an initial set of WritablePartitions
plus one or more read-only parititions that were added during one or more supplementation runs. If and when it becomes desirable to reorganize the historical data back to the core set of WritablePartitions
, this can be accomplished by re-merging the data in place and then by deleting the read-only partitions' data for the re-merged date. More details about the steps involved in re-merging are covered in the Custom merge operations topic.
This script is similar to the example above in that it can be run in a merge worker (console or persistent query) and can also be adapted as a Java class with minor changes. This script also uses start and end dates to set a range of dates to be re-merged.
The script will re-merge one date at a time back into the original set of WritablePartitions
for the namespace, and then delete supplemental data that was written into other non-writable partitions. Once all partitions have been merged, the script will update the metadata index for the table.
Important
The merge step in the script is restartable, but, if the script fails or is stopped after re-merging a date but before or during deletion of the non-writable partition data for that date, those non-writable partitions' data must be manually deleted. Restarting the script in this state could result in data duplication between the re-merged supplemental data, and the original supplemental data.
Note
There is a use_parquet
property, which defaults to false
, which must be set to true
when using the script with a table that is merged using Parquet format.
Click to see the full script.
// This script remerges a selected date into the writable partitions that exist for its namespace.
// It then deletes data from any non-writable partitions for the same table and date, and reruns the metadata indexer to
// update location information for the table. It could be modified to loop through a range of dates; in that case
// it may be desirable to run the metadata indexer just once, after all the remerges are done.
import com.illumon.iris.importers.MergeFromTable;
import com.illumon.util.progress.MinProcessStatus;
import com.illumon.util.progress.ProgressLogger;
import com.illumon.iris.db.tables.databases.Database.StorageFormat;
import com.illumon.iris.db.v2.SourceTableFactory;
import com.illumon.iris.db.v2.locations.local.LocalTablePathManager;
import com.illumon.iris.db.v2.locations.local.LocalMetadataIndexer;
import com.illumon.iris.db.tables.databases.OnDiskDatabase;
import com.illumon.iris.db.v2.locations.TableType;
import com.illumon.iris.db.v2.locations.TableLookupKey;
import com.illumon.iris.db.v2.configuration.DataRoutingService;
import com.illumon.iris.db.v2.locations.TableDataService;
import com.illumon.iris.db.tables.TableDefinition;
import org.apache.commons.io.FileUtils;
import com.fishlib.base.FileUtils;
import java.nio.file.Paths;
import com.illumon.iris.db.v2.locations.TableLocation;
import com.illumon.iris.db.v2.locations.local.LocalTableDataService;
import java.util.*;
// Set the namespace, table name, and column partition value (date) to re-merge
final String namespace = "taq";
final String table_name = "trades";
final String start_date = "2022-06-01";
final String end_date = "2022-08-31";
// Set merge arguments
// allowEmptyInput should be true if the range of dates includes dates for which there is no data
final boolean allowEmptyInput = false
final int threadPoolSize = 4
final int maxConcurrentColumns = 4
final boolean lowHeapUsage = false
final String sortColumnFormula = null
final boolean use_parquet = false
// Set internal properties
final boolean force = true;
final ProgressLogger progress = new ProgressLogger(new MinProcessStatus(), log);
final boolean lateCleanup = true;
final TableDefinition definition = db.getTableDefinition(namespace, table_name);
final String partitioningColumn = definition.getPartitioningColumns().get(0).getName();
final LocalTablePathManager locationManager = db instanceof OnDiskDatabase ? new LocalTablePathManager(((OnDiskDatabase) db).getRootDirectory()) : LocalTablePathManager.getDefaultInstance();
final TableDataService tableDataService = new LocalTableDataService("side-load-remerge", locationManager, EnumSet.of(TableType.SYSTEM_PERMANENT), false, DataRoutingService.NULL_PROPERTY_PROVIDER, false);
// Find partitions for this table
final File targetMultidayRoot = locationManager.getWritablePartitionsRoot(namespace, TableType.SYSTEM_PERMANENT);
final String partitionsRoot = Paths.get(targetMultidayRoot.getParent().toString(), "Partitions").toString();
final Set<String> existingWritablePartitionNames = new HashSet<>();
final Set<String> existingPartitionNames = new HashSet<>();
for (final File file : FileUtils.missingSafeListSubDirectories(targetMultidayRoot)) {
existingWritablePartitionNames.add(file.getName());
}
for (final TableLocation<?> location : SourceTableFactory.INSTANCE.getLocations(tableDataService, definition, false, false)) {
existingPartitionNames.add(location.getInternalPartition().toString());
}
DBDateTime date = convertDateTime(start_date + "T12:00:00 NY");
final DBDateTime end = convertDateTime(end_date + "T12:00:00 NY");
// Iterate through dates, attempting to re-merge in place, and remove non-writable partitions
while (date <= end) {
final String dateString = formatDateNy(date);
final Table sourceTable = db.t(namespace, table_name).where(partitioningColumn + "=`" + dateString + "`");
// Remerge the data back into the writable partitions
new MergeFromTable().merge(
log, // automatically available in a console worker
com.fishlib.util.process.ProcessEnvironment.getGlobalFatalErrorReporter(),
namespace,
table_name,
dateString,
threadPoolSize,
maxConcurrentColumns,
lowHeapUsage,
force,
allowEmptyInput,
sortColumnFormula,
db, // automatically available in a console worker
progress,
(use_parquet ? StorageFormat.Parquet : null), // storageFormat
null, // parquetCodecName default
null, // syncMode not needed
lateCleanup,
sourceTable)
// Delete non-writable partition content for the table and date that was re-merged
for (final String internalPartition : existingPartitionNames) {
if (!(existingWritablePartitionNames.contains(internalPartition))) {
final File toDelete = Paths.get(partitionsRoot, internalPartition, dateString, table_name).toFile();
log.info().append("Deleting no-longer needed read-only partition: \"").append(toDelete.toString()).append("\".").endl();
org.apache.commons.io.FileUtils.deleteDirectory(toDelete);
}
}
date = DBTimeUtils.plus(date, DAY);
}
// Update metadata index for the table to remove empty internal partitions
// If the above process is run in a loop, for a range of dates, updating the index can be run once, after all the merges are complete
LocalMetadataIndexer.updateIndex(
new LocalTableDataService("remerge", locationManager, EnumSet.of(TableType.SYSTEM_PERMANENT), false, DataRoutingService.NULL_PROPERTY_PROVIDER, false),
new TableLookupKey.Immutable(namespace, table_name, TableType.SYSTEM_PERMANENT));