Ingest batch data programmatically

Deephaven offers specialized utilities for importing a number of common data formats, including CSV, XML and JSON. Traditional relational data sources can also be directly imported via the JDBC importer.

These importers have many similar aspects so we'll discuss them collectively here, and then demonstrate how to Import CSV data using schema and also via query using Builder classes. Although readCsv is available for simple imports of CSV data into memory tables in a Deephaven console, these importer classes should be used when you need something more fine-grained, or are using a non-CSV source or need/want:

  • control of the output data type of a column (e.g., you know something is text but might look like a number when the above code reads it, such as various types of order ids)
  • auto-mapping or custom mapping of source column name to target column name
  • the ability to handle invalid rows (such as extraneous header rows)
  • something that automatically uses a directory name and a file prefix, and grabs all the files in order and processes them for you
  • the ability to import data from different sources and formats into a single table
  • the ability to use custom Java code to transform source data during import

The basic steps are as follows.

1. Generate the schema:

To import data into a system table using any import source, you must first create a table in Deephaven with a schema definition that either matches the source structure, or includes additional metadata to describe how to map columns from the source to the target table. Schemas can be created by hand or through a "schema inference" tool provided by Deephaven.

A simple schema file generated from a CSV data sample might look like the following:

<Table name="CsvNames" namespace="Import" storageType="NestedPartitionedOnDisk">
   <Partitions keyFormula="__PARTITION_AUTOBALANCE_SINGLE__"/>
   <Column name="Date" intradayType="none" dataType="String" columnType="Partitioning"/>
   <Column name="FirstName" dataType="String" columnType="Normal" />
   <Column name="LastName" dataType="String" columnType="Grouping" />
   <Column name="Gender" dataType="String" columnType="Normal" />
</Table>

Note

columnType="Normal" is the default setting and can be omitted.

The storageType listed in the schema for CSV, JDBC, JSON and XML table imports must be NestedPartitionedOnDisk. In addition, the schema must include exactly one string column defined as the partitioning column. This column will be used to partition separate import events of data to the table. If an import that would replace an existing partition is started (i.e., there is already data in the table with this partitioning value), the importer will, by default, abort the import. However, optional import instructions can direct the importer to append to or replace existing data rather than aborting.

2. Deploy schema

Once defined, the schema(s) must be deployed before they can be used.

3. Import data into a partition

Because the schema example noted above has no additional metadata to direct import activities, it is expecting a source with "FirstName", "LastName", and "Gender" columns. Matching of column names is case-sensitive and whitespace sensitive. The values for the Date column (the partitioning column in this case) will be provided as one of the arguments to the importer if single-partition import is run, and not from data from the source. For multi-partition imports, data from the source can be used to indicate how blocks of data should be partitioned during import.

The following is an example of a schema file that was created using the schema creator to inspect an XML file. There are two main sections to this schema: the ImportSource block near the top, and the Column details towards the bottom. The Column details define the properties of each column in the table, while the ImportSource sections provides instructions related to importing data from a particular format of source - in this case, an XML file.

<Table name="OrderInfo" namespace="Sample" storageType="NestedPartitionedOnDisk">
  <ImportSource name="IrisXML" type="XML">
     <ImportColumn name="source_Date" sourceName="Date" sourceType="String" formula="source_Date.isEmpty() ? null :DBTimeUtils.convertDateTime(source_Date.replace(&quot; &quot;,&quot;T&quot;).replace(&quot;Z&quot;,&quot; UTC&quot;))" />
     <ImportColumn name="Max_value" sourceName="Max-value" />
     <ImportColumn name="Product_code" sourceName="Product-code" />
     <ImportColumn name="BackOrder" default="null" />
     <ImportColumn name="Macro_value" sourceName="Macro-value" />
   </ImportSource>
  <!-- Directives for overnight merge operations. Everything in one storage partition daily, chosen round-robin. -->
  <Partitions keyFormula="__PARTITION_AUTOBALANCE_SINGLE__" multiday="0,1,2,3,4,5" />
  <Column name="Date" dataType="java.lang.String" columnType="Partitioning" />
  <Column name="source_Date" dataType="com.illumon.iris.db.tables.utils.DBDateTime" columnType="Normal" />
  <Column name="Max_value" dataType="double" columnType="Normal" />
  <Column name="Product_code" dataType="java.lang.String" columnType="Grouping" />
  <Column name="Quantity" dataType="long" columnType="Normal" />
  <Column name="BackOrder" dataType="long" columnType="Normal" />
  <Column name="Warranty" dataType="char" columnType="Normal" />
  <Column name="Comments" dataType="java.lang.String" columnType="Normal" />
  <Column name="Macro_value" dataType="java.lang.String" columnType="Normal" />
</Table>

The schema generators "legalize" column names when reading details of a data set. If a source column name is valid for a column name in Deephaven, and is not in conflict with the partitioning column name, it will be used directly as the column name in the schema. If there is some problem or conflict, the schema generator will modify the name to make it valid, and will also add mapping instructions for the importer to use later. This can be seen in several columns of the preceding sample schema. For example, the column name "Max-value" in the source file is not valid for a column name (hyphens are not allowed). Therefore, the schema generator renamed Max-value to Max_value, and added an ImportColumn entry with sourceName="Max-value" to map Max-value from the source file to Max_value in the Deephaven table.

Besides mapping different source and target names, ImportColumn entries are used by the schema creators to add default values for sparse primitive columns (e.g., numeric columns that are sometimes empty) and to add formulas needed to parse or convert source data to Deephaven types.

The "BackOrder" column is an example where the source data was sometimes empty, so the schema creator added a default of "null". Without the default, import would fail when the importer attempted to parse an empty value into a long.

The "source_Date" column provides an example of two types of ImportColumn operations: it is being renamed, from "Date" in the source file to "source_Date", and it has a formula to reformat its string representation of date/time data into a form that can be passed to convertDateTime to convert it to Deephaven's DBDateTime type. Note that the type of the "source_Date column" is DBDateTime, to match the output of convertDateTime.

CSV Quickstart

Here is an example of how to generate a schema from a CSV data file, deploy it, and import a file from the command line. These commands assume a typical Deephaven installation and a sample file located at /data/sample.csv.

  1. Generate a schema from a sample CSV data file:
iris_exec csv_schema_creator -- -ns CSVExampleNamespace -tn CSVExampleTableName -sf /data/sample.csv

This creates CSVExampleNamespace.CSVExampleTableName.schema in a folder named CSVExampleNamespace.

  1. Deploy the schema:
sudo -u irisadmin /usr/illumon/latest/bin/dhconfig schema import --file /etc/sysconfig/illumon.d/schema/CSVExampleNamespace/CSVExampleNamespace.CSVExampleTableName.schema

If the schema is in a different location, update the directory accordingly.

  1. Import a single data file into the specified Intraday partition:
sudo -u dbmerge /usr/illumon/latest/bin/iris_exec csv_import -- -ns CSVExampleNamespace -tn CSVExampleTableName -sf /data/sample.csv -dp localhost/2018-09-26

This example will generate a table schema from sample.csv with the namespace "CSVExampleNamespace" and table name "CSVExampleTableName". The default behavior creates a directory for the namespace in the current directory and places the output schema file inside this directory. After inspection of the generated schema, it may be deployed. You might want to move this schema file to a standard location for reference. Once deployed, CSV files matching the structure of sample.csv can be imported.

Note

See also: For further details, refer to the full documentation on Importing CSV Files.

Other Available Data Types

Support for the following data types is available "out-of-the-box", however it is also possible to create your own custom ingester.

Using the Builder Classes

These can be used from custom Python or Java applications to batch process imports.

Compared to other methods, this permits more elaborate logic with respect to existing data. These scripts may be executed as a persistent query or from the command line using the iris_exec run_local_script tool. All imports should be performed as the dbmerge user (or from a persistent query, running on a merge server).

Example CSV Import Query

The following script imports a single CSV file to a specified partition:

import com.illumon.iris.importers.util.CsvImport
import com.illumon.iris.importers.ImportOutputMode

rows = new CsvImport.Builder("Test","Sample")
    .setSourceFile("/db/TempFiles/dbquery/staging/data1.csv")
    .setDestinationPartitions("localhost/2018-04-01")
    .setOutputMode(ImportOutputMode.REPLACE)
    .build()
    .run()

println "Imported " + rows + " rows."
from deephaven import *

rows = CsvImport.Builder("Test", "Sample")\
    .setSourceFile("/db/TempFiles/dbquery/staging/data1.csv")\
    .setDestinationPartitions("localhost/2018-04-01")\
    .setOutputMode("REPLACE")\
    .build()\
    .run()

print("Imported {} rows.".format(rows))

Example CSV Import and Merge Query

This example uses the CSV and Merge builder classes to iterate through the matching files in a directory and import and merge them. Note the namespace, tableName, and file values are hypothetical. The script will extract a date from the file name to use as the column partition value when importing.

Python

import glob
from deephaven import *

namespace = "Demo"
tableName = "StockTrades"
sourceDir = "/s3-deephaven/trades/"

# file pattern to match
sourceGlob = "*.csv"

# before and after portions of file name to use when isolating date string
before = "TRADES_"
after = ".pcap"

# Deephaven classes to use to import and merge the CSV data
CsvImport = jpy.get_type('com.illumon.iris.importers.util.CsvImport')
MergeDataBuilder = jpy.get_type('com.illumon.iris.importers.util.MergeData$Builder')

# A function to extract yyyy-MM-dd Deephaven partition value
# from the file name. In this case, the expectation is that
# the source files will have yyyyMMdd in the file name.
def partition(before, after, filename):
 dateStart = 0 if before == "" else filename.find(before)
 if dateStart < 0: return ""
 filename = filename[dateStart + len(before):]
 dateEnd = len(filename) if after == "" else filename.find(after)
 if dateEnd < 0: return ""
 date = filename[:dateEnd]
 if len(date) != 8: return ""
 return date[0:4]+"-"+date[4:6]+"-"+date[6:]

# Iterate through the source files found by glob, and import and
# merge each of them.
for sourceFile in glob.glob(sourceDir + sourceGlob):
 partValue = partition(before, after, sourceFile)
 print("Importing from " + sourceFile)

 rows = CsvImport.builder(namespace, tableName)\
     .setSourceFile(sourceFile)\
     .setDestinationPartitions("localhost/"+partValue)\
     .setOutputMode("REPLACE")\
     .build()\
     .run()

 print("Imported {} rows.".format(rows))

 print("Merging for " + partValue)

 if rows > 0:
   MergeDataBuilder(db,namespace,tableName) \
     .setPartitionColumnValue(partValue) \
     .setForce(True) \
     .build() \
     .run()

 # This portion of the script is optional. Logic can be added
 # here to check that the row counts of intraday and historical
 # match, and to delete the intraday partition and/or source
 # file from disk.
 filter = "Date = `" + partValue + "`"
 iRows=db.i(namespace,tableName,False).where(filter).size()
 hRows=db.t(namespace,tableName).where(filter).size()
 #if iRows == hRows:
   #delete intraday partition and source CSV here