Merging data
Both batch and streaming data is generally put in intraday tables when imported. Once imported, merge functionality will be used on a day-to-day basis to move data from the intraday database to the historical database. See the Deephaven Data Lifecycle for further details. Merge functionality is provided by several scripts, classes, and the "Data Merge" persistent query type.
Merging is normally performed for persistent data on a nightly basis. Similar to imports, merges may be performed by Persistent Query jobs ("Data Merge Job"), from the command line, or scripted for maximum flexibility.
Merging is closely related to Validation. After each merge operation, it is useful to verify the data quality and, optionally, delete the intraday source data.
Data Merge Query
Note
The Merge option is only available in Deephaven Classic.
When the persistent query type "Data Merge" is selected, the Persistent Query Configuration Editor window shows the following options:
Note
Data Merges for multi-partition imports cannot be run from a standard Merge persistent query at present. This must be done from the command line, from a bulk-copied Merge query, or from a custom script in a Batch Query - Import Server persistent query.
-
To proceed with creating a merge query, you will need to select a DB Server and enter the desired value for Memory (Heap) Usage (GB).
-
Options available in the Show Advanced Options section of the panel are typically not used when importing or merging data. To learn more about this section, please refer to the Persistent Query Configuration Viewer/Editor.
-
The Access Control tab presents a panel with the same options as all other configuration types, and gives the query owner the ability to authorize Admin and Viewer Groups for this query. For more information, please refer to Access Control.
-
Clicking the Scheduling tab presents a panel with the same scheduling options as all other configuration types. For more information, please refer to Scheduling.
-
Clicking the Merge Settings tab presents a panel with the options pertaining to merging data to a file already in Deephaven:
Merge settings
- Namespace: This is the namespace for the data being merged.
- Table: This is the table for the data being merged.
- Partition Value Formula: This is the formula needed to partition the data being merged. If a specific partition value is used it will need to be surrounded by quotes. In most cases, the previous day's data will be merged.
- For example:
com.illumon.util.calendar.Calendars.calendar("USNYSE").previousDay(1)
- This merges the previous day's data based on the USNYSE calendar. "2017-01-01"
- This merges the data for the date "2017-01-01" (the quotes are required).
- For example:
- Partition Value Formula: This is the formula needed to partition the data being merged. If a specific partition value is used it will need to be surrounded by quotes. In most cases, the previous day's data will be merged.
- Table Data Service Configuration: Specifies how input data to merge should be found and read. The drop-down menu is populated from the Data Routing Configuration YAML file and contains all of the items from the
tableDataService
section and thedataImportServer
section, unless at least one has the "merge" tag. If any merge tag is present, then only those items with that tag are included. - Sort Column Formula: An optional formula to sort on after applying column groupings. The formula can be a simple column name (e.g.,
Timestamp
) or a formula that could be passed toupdateView()
(e.g., to sort onTimestamp
, binned by second:upperBin(Timestamp, 1000000000L)
). If no formula is supplied, data will be in source order except where reordered by grouping. - Format: Specifies the format in which the data should be written.
- Default: use the default merge format from the schema. If none is defined, use Deephaven.
- Deephaven: write the data in Deephaven format.
- Parquet: write the data in Parquet format.
- Codec: For a Parquet merge, it specifies the compression codec to be used.
DEFAULT
: use the default codec (either from the schema, or if none is defined there, SNAPPY).<Other values>
: use the chosen codec. See Parquet Codecs for further details.
- Low Heap Mode: Whether to run in low heap usage mode, which makes trade-offs to minimize the RAM required for the merge JVM's heap, rather than maximizing throughput. The two modes are referred to as "low heap" and "maximum throughput" throughout this document.
- Force: Whether to allow overwrite of existing merged data.
- Thread Pool Size: The number of concurrent threads to use when computing output order (i.e., grouping and sorting) or transferring data from input partitions to output partitions. More threads are not always better, especially if there is significant reordering of input data which may degrade read cache efficiency.
For batch data, a Data Merge query will typically have a dependency on an import query configured in the Scheduling tab. For streamed data a merge query may simply run at a specified time.
Data Merge queries can also be created from an import query. After creating an import query with the Schema Editor, you will be prompted to create the corresponding merge. If you have an import query without a corresponding merge, an option to Create Merge Query will be available in the Query Config panel's context (right-click) menu.
Parquet Codecs
Parquet allows the use of various compression codecs. Supported codecs include UNCOMPRESSED
and SNAPPY
.
Merge from Script
The most flexible method for merging data into Deephaven is via Groovy/Python scripting. Data Merge scripts may be deployed via the command line or through a Batch Query (RunAndDone) persistent query. Merge tasks are most easily executed from a script by using a "builder" class. The underlying merge logic is identical to that accessible from the command line tools and persistent queries.
Command Line Execution
To run a script from the command line, use the iris_exec
program as follows, where ./script.groovy
is a local script file to execute on the server:
iris_exec run_local_script -- -s ./script.groovy -h <merge server host> -p <merge server port>
See Running Local Scripts for more details on options.
Caution
Merging data requires the appropriate write permissions. This means these scripts should be run on a "merge server", not a "query server". Since the latter is the default in the local script runner, you must specify an alternate host and/or port to reference a merge server. See the -queryHost
and -queryPort
options to run_local_script
.
Example
The following example shows how to execute a merge from a Groovy or Python script.
Note
There are many options not illustrated here. Please refer to Merge API Reference for all the variations available.
import com.illumon.iris.importers.util.MergeData
import com.illumon.iris.importers.ImportOutputMode
new MergeData.Builder(db, "Test", "MyTable")
.setPartitionColumnValue(partition)
.build()
.run()
println "Done merging!"
from deephaven import *
MergeData.builder(db, "Test", "MyTable")\
.setPartitionColumnValue(partition)\
.build()\
.run()
print("Done merging!")
This script assumes the partition is set via a command line argument and might be executed from the command line for the 2018-05-01 partition as: (Note: The port is specified to connect to the appropriate Deephaven server for merge operations.)
iris_exec run_local_script -- -s ~/myscripts/merge_single.groovy -p 30002 partition "2018-05-01"
Merge API Reference
The Merge API is fairly simple, and analogous to the Import API. There is a single MergeData
class with a builder method that returns a MergeDataBuilder
object. MergeDataBuilder
will produce a MergeData
object when build()
is called. Note that this MergeData
class is in the com.illumon.iris.importers.util
package (there is another MergeData
class in the com.illumon.iris.importers
package that is not intended for use from a script). Then the merge is executed via calling the run()
method on the MergeData
object used to build the merge. The general pattern for performing a merge is:
MergeData.Builder(db, <namespace>, <table>)
.set<param>(<param value>)
...
.build()
.run()
Merge options
Option Setter | Type | Req? | Default | Description |
---|---|---|---|---|
setPartitionColumnValue | String | No* | N/A | A literal string used to select the column partition to merge. Often a date; e.g., "2018-05-01". |
setPartitionColumnFormula | String | No* | N/A | An expression that will be evaluated to specify the partition to merge; e.g., currentDateNy() . |
setThreadPoolSize | int | No | 4 | The maximum number of parallel threads to use during the merge process. |
setMaximumConcurrentColumns | int | No | unlimited | The maximum number of input columns to process concurrently during a maximum throughput mode merge. |
setLowHeapUsage | boolean | No | false | Whether to prioritize heap conservation over throughput. |
setForce | boolean | No | false | Whether to force merge when destination(s) already have data. |
setAllowEmptyInput | boolean | No | true | Whether to allow merge to proceed if the input data is empty. |
setSortColumnFormula | String | No | N/A | Formula to apply for sorting, post-grouping. The formula can be a simple column name (e.g., Timestamp ) or a formula that could be passed to updateView() (e.g., to sort on Timestamp , binned by second: upperBin(Timestamp, 1000000000L) ). |
setStorageFormat | String | No | N/A | Sets the merge format. Refer to the defaultMergeFormat in the schemas for valid values. |
setCodecName | String | No | N/A | For Parquet, sets the compression codec. This can be a valid Parquet codec, such as UNCOMPRESSED . If this isn't used, then the default codec from the schema is used, or SNAPPY if one isn't defined in the schema. |
* Either a partition column value or formula must be specified (not both).
Data Merging Classes
Caution
It is recommended that merges be executed by Persistent Query jobs or by scripting using the Merge builder, not by directly calling these classes. This information is provided primarily for administrators who want an in-depth understanding of the merge processing, or for data management scenarios which require more flexibility than what is exposed by the UI and API methods.
The MergeIntradayData
class is used to copy and reorganize data from intraday to historical storage.
This class is primarily used by merge queries created in the Query Configuration Editor or by the Schema Editor in the Deephaven UI. However, it can also be used by the XML-driven import and merge scripts. Merge queries are the preferred method to use the MergeIntradayData
class, but it can also be called directly, used from custom scripts, or called from an application. This section details its functionality and arguments.
Intraday data is written into column files in the order in which it arrives. As such, there is no grouping of data, and the only sorting is by arrival time of the events. Since intraday data is typically used for data that is still receiving updates, the main priorities are around low latency in appending to the table and delivering those updates to running queries. Historical data does not receive updates, so the priority there is around organizing data for more efficient querying.
Functionality of the MergeIntradayData
class
The MergeIntradayData
class reads intraday data for a specified partition (usually a date) and writes it into historical storage. During this process, the class can group the data based on grouping columns specified in the schema, sort data based on an argument passed to the class and distribute data across one or more writable partitions.
Note
These writable partitions are different from the column partition indicated by the partitioning column in the schema. Writable partitions are storage partitions set up to allow sharding of historical data for faster querying and potentially to provide for larger total storage capacity.
When sharing data, the MergeIntradayData
class uses the partitioning keyFormula
defined in the table's schema. This formula describes how data should be distributed when there are multiple writable partitions configured for historical storage. For more details see Schemas.
In addition to the MergeIntradayData
class, there is also a RemergeData
class. It provides similar functionality, but merges data from existing historical partitions for one namespace and table into new historical partitions for a different namespace and/or table.
During merge processing, the source data is read, and a Deephaven byExternal
method is used to create a new sub-table of results for each partition based on the target table's partitioning formula. Then, for each partition sub-table, groups are aggregated and ordered from largest to smallest (number of rows in the group). Optional sorting is applied to rows within the groups, and the grouped data is then written out to disk. Note that columns used for grouping cannot be used for sorting.
Arguments and Property Settings
MergeIntradayData
The MergeIntradayData
class (com.illumon.iris.importers.MergeIntradayData
) main takes the following arguments:
- Required (first three arguments, in this order):
<namespace> <tableName> <partitioning column value>
- Optional (by name, after the three required arguments):
sortColumn=<column name or formula to sort by>
tableDef=<absolute path to table definition file>
lowHeapUsageMode=<true or false>
force=<true or false>
Parameter | Description |
---|---|
namespace | (Required) Namespace in which to find the table whose data is to be merged. |
tableName | (Required) Name of the table whose data is to be merged. |
partitioning column value | (Required) This is typically a date string but could be some other unique string value. It indicates what column partition will be merged by this merge operation. |
sortColumn | (Optional) A formula by which to sort rows within a group. If not provided, rows within a group will be sorted based on the order in which they were written into the intraday partition. This column cannot be one of the grouping columns. |
tableDef | (Optional) If a different table definition is to be used for the merge operation, or the table definition is not accessible within the context in which the merge operation is being run, this value can provide a path to the definition file. |
lowHeapUsage | (Optional) Reduces memory consumption of the merge process at the expense of throughput. Restricts most activities to single-threaded, one-by-one, and limits expansion of indexes in memory. |
force | (Optional) By default, a merge will fail if any historical partition already exists for the data to be merged. Force will cause merge to overwrite the existing destination(s). |
mergeFormat | Sets the merge format. Refer to the defaultMergeFormat in the schemas for valid values. |
parquetCodec | For Parquet, sets the compression codec. This can be a valid Parquet codec, such as UNCOMPRESSED . If this isn't used, then the default codec from the schema is used, or SNAPPY if one isn't defined in the schema. |
RemergeData
The RemergeData
class (com.illumon.iris.importers.RemergeData
) main takes the following arguments:
- Required (first five arguments, in this order):
<sourceNamespace> <sourceTableName> <namespace> <tableName> <partitioning column value>
- Optional (by name, after the five required arguments):
sortColumn=<column name or formula to sort by>
tableDef=<absolute path to table definition file>
lowHeapUsageMode=<true or false>
force=<true or false>
Parameter | Description |
---|---|
sourceNamespace | (Required) Namespace in which to find the table whose data is to be re-merged. |
sourceTableName | (Required) Name of the table whose data is to be re-merged. |
namespace | (Required) Namespace in which to find the table into which to re-merge the data. |
tableName | (Required) Name of the table into which to re-merge the data. |
partitioning column value | (Required) This is typically a date string but could be some other unique string value. It indicates what column partition will be merged by this merge operation. |
sortColumn | (Optional) A formula by which to sort rows within a group. If not provided, rows within a group will be sorted based on the order in which they were written into the intraday partition. This column cannot be one of the grouping columns. |
tableDef | (Optional) If a different table definition is to be used for the merge operation, or the table definition is not accessible within the context in which the merge operation is being run, this value can provide a path to the definition file. |
lowHeapUsage | (Optional) Reduces memory consumption of the merge process at the expense of throughput. Restricts most activities to single-threaded, one-by-one, and limits expansion of indexes in memory. |
force | (Optional) By default, a merge will fail if any historical partition already exists for the data to be merged. Force will cause merge to overwrite the existing destination(s). |
mergeFormat | Sets the merge format. Refer to the defaultMergeFormat in the schemas for valid values. |
-
In addition to these arguments, both classes will use the property
iris.concurrentWriteThreads
to control parallelization of processing, executing steps that allow for parallel processing via a thread pool whose maximum capacitynConcurrentThreads
is defined by the value of that property. Additionally, when using maximum throughput mode,merge.maximumConcurrentColumns
limits the number of input columns that are processed simultaneously. The default value ofInteger.MAX_VALUE
processes all columns simultaneously (to the limit ofnConcurrentThreads
). -
The initial execution of
byExternal
, to partition the results, is single threaded. Subsequent steps may be parallelized depending onnConcurrentThreads
, the value oflowHeapUsageMode
, and the number of destination partitions. -
For each destination partition, the output ordering must be determined by grouping and sorting the input data that will be written to that partition. Following this, the data must be written according to said ordering.
-
Minimum heap usage for the merge process would be with
iris.concurrentWriteThreads = 1
andlowHeapUsageMode = true
. -
The other extreme of tuning would be
lowHeapUsageMode = false and concurrentWriteThreads
set high enough that all output partitions and column writes could be processes in parallel. This "maximum throughput" configuration can easily consume large amounts of memory and dominate I/O channels. Configurations that move towards maximum throughput should be evaluated carefully.
In lowHeapUsageMode
, only one destination partition is processed at a time.
For each partition, the ordering is determined in a single thread, and then the
thread pool is used to execute writing jobs for each column of the output
table.
Otherwise (if lowHeapUsageMode
is false
), all destination partitions are
processed in parallel, to the limit imposed by nConcurrentThreads
. The
ordering jobs for the destination partitions are executed via the thread pool,
as are the writing jobs for each output column of each destination partition.
The first columns to be written are the grouping columns. There is a single
job submitted for each destination, which writes all of the grouping columns in
turn. The remaining columns are written with one job per destination and
column. For example, if there are two output partitions O
and P
writing
columns A
and B
, then four jobs are submitted as (O, A)
, (P, A)
, (O, B)
, (P, B)
. This makes it more likely that the output jobs read all input
values for column A from cache when processing the second output destination.
The output partitions may have different writing speeds, so jobs may read from
many input columns at once. To limit the number of concurrently read input
columns, set the property merge.maximumConcurrentColumns
. This reduces the
possible parallelism of the merge job, but may improve the cache behavior of
the output jobs by preventing subsequent output columns evicting data from
columns that another output job is still working on.
Custom Merge operations
The merge
method of the MergeData
class is used by MergeIntradayData
to perform the main processing of data merge operations. There are also overloads of this method which expose additional arguments which can be used for custom data merges.
Two common custom data merge scenarios are:
- Merging data from a memory table.
- Remerging data in place to correct or recalculate existing merged data.
Like other data merge steps, these operations must be executed from workers running on the merge server, to ensure that the worker will have sufficient access to write data to historical data stores.
Note
Merge operations that use a custom sourceTable must ensure that all data values are available when the merge
method reads them. Therefore formation of sourceTables should use update
rather than view
or updateView
, and it may be neccessary to force all data to be calculated and preloaded into memory by adding .select()
to the end of the sourceTable query.
Merging from a Memory Table
Merging from a memory table allows Deephaven to be used for data transformations - combining data from multiple sources, parsing and reformatting, etc, and then writing the results directly to historical storage. A key consideration here, though, is that the worker being used for the merge operation must have sufficient heap allocated to handle both the data merge process and storage of the entire source table to be merged.
This Groovy example uses the MergeFromTable
class to provide the merge
method overload which exposes all the possible arguments. Many of them are not needed for the example, so are empty, null, or using no-op implementations. The example reads from a CSV file on the merge server using readCsv
and merges it directly to historical storage using the default (Deephaven) historical data format. Prerequistes for this example were creating and deploying the Data.Values2 schema, and setting up historical storage for the Data namespace under /db/Systems.
import com.illumon.iris.importers.MergeFromTable
import com.illumon.util.progress.MinProcessStatus
import com.illumon.util.progress.ProgressLogger
String namespace = "Data"
String tableName="Values2"
String date = currentDateNy()
boolean force = false
boolean allowEmptyInput = false
int threadPoolSize = 4
int maxConcurrentColumns = 4
boolean lowHeapUsage = false
ProgressLogger progress = new ProgressLogger(new MinProcessStatus(), log)
boolean lateCleanup = false
String sortColumnFormula = null
// In the case of this file, there were two numeric columns (Farmer_ID and I)
// which were discovered as long, but are inferred by readCsv as int.
// These were manually switched to int before deploying the schema, since
// merge requires exact matches between sourceTable types and destination
// columns being written.
String fileName = "Agricultural_Commoditites_Grown_By_Farmer.csv"
String path = "/tmp/" + fileName
sourceTable = readCsv(path)
new MergeFromTable().merge(
log, // automatically available in a console worker
com.fishlib.util.process.ProcessEnvironment.getGlobalFatalErrorReporter(),
namespace,
tableName,
date,
threadPoolSize,
maxConcurrentColumns,
lowHeapUsage,
force,
allowEmptyInput,
sortColumnFormula,
db, // automatically available in a console worker
progress,
null, // storageFormat not needed
null, // parquetCodecName not needed
null, // syncMode not needed
lateCleanup,
sourceTable)
To use Parquet data format instead, add import com.illumon.iris.db.tables.databases.Database.StorageFormat
and pass StorageFormat.Parquet
instead of null
for the storageFormat
argument (the next one after progress
).
Re-merging Data in Place
In the previous example, there are force
and lateCleanup
arguments which are set to false
:
force
indicates whether existing data for a target partition should be replaced.lateCleanup
will determine whether or not a forced merge waits until new column files are written before deleting the previous files.
In cases where existing historical data needs to be re-merged, to correct some data, for instance, a script like the one above can be used, with the source table being formed using db.t()
to read the existing historical data, and table operations applied to implement the needed correction(s).
When force
is true
, the merge
operation will delete the contents of the destination directories before writing the new column files. Regardless of whether force
is true
or false
, the new column files are initially written to hidden temporary directories, and moved to the target directories after they have all been written.
When calling merge
, force
and lateCleanup
must be set to true
. As mentioned, force
is needed because there is already data in the target directories which must be replaced; in addition lateCleanup
is needed to change the behavior of force
. When lateCleanup
is true
, a forced merge will wait until the new column files have been written before deleting the previous column files. This allows the previous column files to be used when constructing the sourceTable
.
Merge Heap Calculations and Merge Throughput Optimizations
For merge optimization, the following considerations matter:
- Total available heap.
- Data buffer cache.
- Number of concurrent threads.
- Low heap usage mode (or, alternatively, maximum throughput mode).
- Number of output partitions, and their relative performance.
A good rule of thumb for determining the best value for your merge heap size is to double the value of the data buffer pool size.
buffer pool size = max_over_all_columns(column file data size) * nWritingThreads / nOutputPartitons / 0.85
heap size = 2*buffer pool size
You will need to give your process enough heap to accommodate the requisite data buffer pool. (See Data Buffer Pool Configuration.) The data buffer pool needs to be able to fit the number of columns you will write concurrently.
Note
See also: Merge Optimization