Custom Importer
If one of the existing import tools does not support your input data, it might be useful to create a custom importer.
ExampleImporter.java extends BaseImporter.java in a very basic way in order to illustrate how the classes allow easy functional additions. BaseImporter
handles most of the setup, freeing you to focus on just moving the data. ExampleImporter.java
has extensive documentation that should explain what the class does and what needs to be overridden or copied.
You will need to create your own class extending BaseImporter
. Then:
- Create an Arguments class extending
StandardImporterArguments
. This class needs to add arguments for the information your importer needs to locate and process the data to be imported. - Copy main, replacing occurrences of
ExampleImporter.Arguments
with yourArguments
class, andExampleImporter
with yourimporter
class. - Implement
processData
in yourimporter
class. This method must set all the column values using theTableWriter
instance and callTableWriter.writeRow
for each row of output data.
ExampleImporter
includes implementations at two extremes - one is completely hardcoded and the other is completely dynamic.
Prerequisites
The namespace and table name must exist in the System namespace, and the table definition must exist. This will be true if a schema has been deployed for the table.
Executing the program
This importer should be run on a merge server host, with local access to intraday data..
All Deephaven processes expect certain properties. Generally these are provided by scripts or service configuration. In this case, you need to provide them as you run the import program.
Classpath
Your ingester's classpath should include installed Deephaven files, the default override locations, and the path to your own java code.
-
/etc/sysconfig/illumon.d/hotfixes
-
/etc/sysconfig/illumon.d/override
-
/etc/sysconfig/illumon.d/resources
-
/etc/sysconfig/illumon.d/java_lib/*
-
/usr/illumon/latest/etc
-
/usr/illumon/latest/java_lib/*
process.name
Set this to whatever seems appropriate. Only one instance of each process.name
can run at any given time. Pass this as a JVM argument:
-Dprocess.name=example_importer
Configuration.rootFile
The root property file specifying configuration properties for your process. Pass this as a JVM argument:
-DConfiguration.rootFile=iris-common.prop
devroot
This can be /usr/illumon/latest
, or the resolved target of that link.
-Ddevroot=/usr/illumon/latest
workspace
This governs where log files will go. Matching the other import processes should be fine.
-Dworkspace=/db/TempFiles/dbmerge/example_importer
Class to execute
Replace com.illumon.iris.importers.ExampleImporter
with your class.
Program Arguments
Standard importer arguments, plus any new arguments you add.
Standard arguments are:
-dd
or--destinationDirectory <path>
-dp
or--destinationPartition <internal partition name / partitioning value>
-ns
or--namespace <namespace>
-tn
or--tableName <name>
-om
or--outputMode <import behavior>
Note
See: Tables & Schemas for more information.
This will give you a command line something like:
export EXAMPLECLASSPATH=/etc/sysconfig/illumon.d/hotfixes:/etc/sysconfig/illumon.d/override:/etc/sysconfig/illumon.d/resources:/etc/sysconfig/illumon.d/java_lib/*:/usr/illumon/latest/etc:/usr/illumon/latest/java_lib/*
sudo java -cp $EXAMPLECLASSPATH -Dprocess.name=example_importer -DConfiguration.rootFile=iris-common.prop -Ddevroot=/usr/illumon/latest -Dworkspace=/db/TempFiles/dbmerge/example_importer com.illumon.iris.importers.ExampleImporter -dp hostname/2018-03-07 -om REPLACE -ns ExampleNamespace -tn ExampleTable [custom arguments]
Importing as Query
ExampleImporter
and ExampleImporter.Arguments
include overrides that bypass the command line. Importers that override these methods in a similar way can be called from a Deephaven console or persistent query.
import com.illumon.iris.importers.ExampleImporter
// Call the static convenience method
ExampleImporter.importData(log, "ExampleNamespace", "ExampleTable", "hostname/2018-01-01", "REPLACE", 12, 23)
// Create an object and invoke doImport()
e = new com.illumon.iris.importers.ExampleImporter(log, "ExampleNamespace", "ExampleTable", "hostname/2018-03-08", "REPLACE", 12, 3)
e.doImport()
ExampleImporter.java
Expand here to see the full code for ExampleImporter.java
package com.illumon.iris.importers;
import com.fishlib.base.log.LogOutput;
import com.fishlib.configuration.Configuration;
import com.fishlib.io.logger.Log4jAdapter;
import com.fishlib.io.logger.Logger;
import com.fishlib.io.logger.ProcessStreamLoggerImpl;
import com.fishlib.util.PidFileUtil;
import com.fishlib.util.process.ProcessEnvironment;
import com.illumon.dataobjects.ColumnDefinition;
import com.illumon.iris.binarystore.*;
import com.illumon.iris.db.tables.TableDefinition;
import com.illumon.iris.db.tables.dataimport.DbNamespace;
import com.illumon.iris.db.tables.utils.DBDateTime;
import com.illumon.iris.utils.SystemLoggerTimeSource;
import org.apache.commons.cli.*;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;
/**
* <p> This file is an example importer that illustrates the extensions to
* {@link BaseImporter} needed to create a custom importer.
*
* <p> There are two examples. Both manufacture data based on a row index.
*
* <p> {@link #processDataSimple()} will write data to a table defined by the following schema:
* <pre>{@code
* <Table name="DemoTable" namespace="ExampleNamespace" rethrowLoggerExceptionsAsIOExceptions="false" storageType="NestedPartitionedOnDisk">
* <Partitions keyFormula="${autobalance_by_first_grouping_column}" />
*
* <Column name="StringCol" dataType="String" />
* <Column name="DateTimeCol" dataType="DateTime" />
* <Column name="IntegerCol" dataType="Integer" />
* <Column name="LongCol" dataType="Long" />
* <Column name="DoubleCol" dataType="Double" />
* <Column name="FloatCol" dataType="Float" />
* <Column name="BooleanCol" dataType="Boolean" />
* <Column name="CharCol" dataType="Char" />
* <Column name="ByteCol" dataType="Byte" />
* <Column name="ShortCol" dataType="Short" />
* <Column name="Date" dataType="String" columnType="Partitioning" />
* </Table>
* }</pre>
*
* <p> The getters and setters are hard-coded and stored in variables. This method
* is simple, but tedious.
*
* <p> A valid invocation will include options similar to the following
* (note that the namespace and table name parameters match the example schema above:
* <p><code> -dp localhost/2018-02-28 -ns ExampleNamespace -tn DemoTable -om REPLACE -nd 0 -st 100 -nr 123 </code>
*
* <p> {@link #processDataDynamic()} will write data to any table with an existing
* schema.
* The code to get the data is a calculation based on the index and data
* type. There are several layers of code to move calculations out of the row
* processing loop, and to ensure that unnecessary inefficiency (e.g. boxing of
* primitives) is avoided.
* <p> A valid invocation will include options similar to this:
* <p><code> -dp localhost/2018-02-28 -ns AnyNamespace -tn AnyTable -om REPLACE -nd 0 -st 100 -nr 123 </code>
*/
public class ExampleImporter extends BaseImporter<ExampleImporter.Arguments> {
/**
* Define this inner class if you need to add arguments to StandardImporterArguments.
* Define the new options in {@link #addOptions} and handle them in the constructor.
*/
static class Arguments extends StandardImporterArguments {
// constants for the custom argument strings
private static final String WELCOME_MESSAGE_OPTION = "wm";
private static final String START_OPTION = "st";
private static final String NUM_ROWS_OPTION = "nr";
private static final String SIMPLE_OPTION = "simple";
// hold onto parsed values
private final int startIndex;
private final int numRows;
private final String message;
private final boolean simpleExample;
/**
* Construct Arguments from a command line.
*
* @param commandLine the command line, as passed from {@link ExampleImporter#main}.
* @throws ParseException if the command line or parameters are not valid
*/
Arguments(@NotNull CommandLine commandLine) throws ParseException {
super(commandLine);
// validate and handle custom options here
// Example usage of an optional String argument
if (commandLine.hasOption(WELCOME_MESSAGE_OPTION)) {
message = commandLine.getOptionValue(WELCOME_MESSAGE_OPTION);
} else {
message = null;
}
try {
// optional Number argument
if (commandLine.hasOption(START_OPTION)) {
startIndex = ((Number)commandLine.getParsedOptionValue(START_OPTION)).intValue();
} else {
startIndex = 0;
}
// required Number argument
numRows = ((Number)commandLine.getParsedOptionValue(NUM_ROWS_OPTION)).intValue();
} catch (ParseException pe) {
// The option could not be parsed into the declared type.
// You don't really want to catch and immediately rethrow - handle the exception here, or remove the catch.
throw pe;
}
simpleExample = commandLine.hasOption(SIMPLE_OPTION);
}
/**
* Alternate constructor for usage that doesn't start at main (with a command line).
*
* @param namespace namespace of the destination table
* @param tableName name of the destination table
* @param partitionString internal and column partition values ("internalpartition/columnpartition")
* @param outputMode a valid {@link ImportOutputMode} value
* @param startIndex see the -st command line option: "Generate rows starting at startIndex (default is 0)"
* @param numRows see the -nr command line option: "Generate this many rows"
*/
Arguments(// StandardImporterArguments values
@NotNull final String namespace,
@NotNull final String tableName,
@NotNull final String partitionString,
@NotNull final String outputMode,
// ExampleImporter.Arguments values
final int startIndex,
final int numRows) {
super(DbNamespace.valueOf(namespace), tableName, partitionString, null, null, ImportOutputMode.valueOf(outputMode));
this.startIndex = startIndex;
this.numRows = numRows;
this.simpleExample = false;
this.message = null;
}
/**
* Alter the standard arguments passed in by adding options specific to this importer.
* See {@link Options}, {@link Option} and {@link Option.Builder}.
*
* @param options addOptions should add new options to this set of Options.
* @return an Options with custom modifications to the input options
*/
private static Options addOptions(@NotNull final Options options) {
// the following option produces this usage string:
// -wm,--welcomeMessage <message> Example string argument
Option wmOption =
Option.builder(WELCOME_MESSAGE_OPTION) // short name of option, e.g. -wm vs --welcomeMessage
.longOpt("welcomeMessage") // long name of option, e.g. --welcomeMessage vs -wm
.desc("Example string argument") // option description, used in usage message
.hasArg() // include this if the option has an argument
.argName("message") // name of the argument value
.type(String.class) // argument type, used if getParsedOptionValue is used
.required(false) // makes this option optional
.build();
// An OptionGroup indicates mutually exclusive options
final OptionGroup exclusiveGroup = new OptionGroup();
exclusiveGroup.setRequired(true);
exclusiveGroup.addOption(
// you can declare options inline
Option.builder("nd")
.longOpt("numDogs")
.desc("Number of dogs (excludes cats)")
.hasArg()
.argName("number of dogs")
.type(Number.class)
.build())
// you can chain the calls together
.addOption(Option.builder("nc").longOpt("numCats").desc("Number of cats (excludes dogs)").hasArg().argName("number of cats").type(Number.class).build());
// Add Options and OptionGroups the same way
return options
.addOption(wmOption)
.addOptionGroup(exclusiveGroup)
.addOption(Option.builder(SIMPLE_OPTION).required(false).longOpt("simple").desc("Use SimpleExampleDataSource").build())
.addOption(Option.builder(NUM_ROWS_OPTION).required().longOpt("numRows").desc("Generate this many rows").hasArg().argName("numRows").type(Number.class).build())
.addOption(Option.builder(START_OPTION).required(false).longOpt("startIndex").desc("Generate rows starting at startIndex (default is 0)").hasArg().argName("startIndex").type(Number.class).build());
}
/**
* Sub-classes should define their own version of appendArguments, which
* should almost always invoke the superclass implementation first.
* The superclass, {@link StandardImporterArguments}, includes
* namespace, tableName, destinationDirectory, and outputMode.
*
* @param logOutput Append argument names and values to this LogOutput
* @return The logOutput for call chaining
*/
public LogOutput appendArguments(@NotNull final LogOutput logOutput) {
super.appendArguments(logOutput);
// For example,
//return logOutput
// .append(",destinationDirectory=").append(destinationDirectory.toString())
// .append(",outputMode=").append(outputMode);
return logOutput;
}
/**
* Accessor for customized option startIndex.
*
* @return The specified or default start index.
*/
int getStartIndexArgument() {
return startIndex;
}
/**
* Accessor for customized option numRows.
*
* @return The specified number of rows.
*/
int getNumRowsArgument() {
return numRows;
}
/**
* Accessor for customized string argument.
*
* @return The welcome message.
*/
String getMessage() {
return message;
}
/**
* You can certainly add methods to find out whether optional arguments were specified.
*
* @return true if a message was given on the command line
*/
boolean hasMessageArgument() {
return message != null;
}
}
/**
* <p>Example data source. This may implement {@link TableReader}, but does not have to.
* This class needs to read your data source and supply the rows to ExampleImporter.processData.
* You will likely need to add custom arguments to identify the data source.
*
* <p>This implementation produces manufactured data based on an index in a
* range given in the constructor.
*/
private static class SimpleExampleDataSource {
private final int start;
private final int numRows;
private int index;
SimpleExampleDataSource(int start, int numRows) {
this.start = start;
this.numRows = numRows > 0 ? numRows : 0;
index = start-1;
}
/**
* Read or otherwise prepare the next row of data.
* This implementation manufactures the data.
*
* @return true if another row is available, false if there is no more data.
*/
boolean readRow() {
if (index < start + numRows - 1) {
index = index +1;
return true;
}
return false;
}
// This simple implementation hard-codes accessors for the known columns.
String getStringColValue() {
return "string:" + index;
}
DBDateTime getDateTimeColValue() {
return DBDateTime.now();
}
long getLongColValue() {
return index;
}
int getIntegerColValue() {
return index;
}
double getDoubleColValue() {
return 0.123d + index;
}
float getFloatColValue() {
return 0.123f + index;
}
boolean getBooleanColValue() {
return index % 3 == 0;
}
char getCharColValue() {
return (char)index;
}
byte getByteColValue() {
return (byte)(index%128);
}
short getShortColValue() {
return (short)index;
}
}
/**
* <p> Example data source. This may implement {@link TableReader}, but does not have to.
* This class needs to read your data source and supply the rows to ExampleImporter.processData.
* You will likely need to add custom arguments to identify the data source.
*
* <p> This implementation dynamically discovers the columns in the table
* definition, and produces manufactured data based on an index in a
* range given in the constructor.
*/
private static class DynamicExampleDataSource {
private final int start;
private final int numRows;
private int index;
/** Compute the getter methods once and store them. */
final Map<String, RowGetter> rowGettersByName = new HashMap<>();
DynamicExampleDataSource(int start, int numRows, TableDefinition tableDefinition) {
this.start = start;
this.numRows = numRows > 0 ? numRows : 0;
index = start-1;
init(tableDefinition);
}
/**
* Read or otherwise prepare the next row of data.
* This implementation manufactures the data.
*
* @return true if another row is available, false if there is no more data.
*/
boolean readRow() {
if (index < start + numRows - 1) {
index = index +1;
return true;
}
return false;
}
/**
* Get the getter for a named column.
* The getter is a pre-calculated implementation of RowGetter.
* This variant cannot return a typed RowGetter.
*
* @param name the column to get the getter for.
* @return the getter
* @throws IllegalArgumentException when the getter does not exist
*/
RowGetter getGetter(String name) {
RowGetter rowGetter = rowGettersByName.get(name);
if (rowGetter == null) {
throw new IllegalArgumentException("No getter for column " + name);
}
return rowGetter;
}
/**
* Get the typed getter for a named column.
* The getter is a pre-calculated implementation of RowGetter.
*
* @param name the column to get the getter for.
* @return the getter
* @throws IllegalArgumentException when the getter does not exist
* @throws ClassCastException when the type is inappropriate
*/
public <T> RowGetter<T> getGetter(String name, Class<T> tClass) {
RowGetter getter = getGetter(name);
if (tClass.isAssignableFrom(getter.getType())) {
//noinspection unchecked
return (RowGetter<T>) getter;
} else if (tClass.isAssignableFrom(com.illumon.util.type.TypeUtils.getBoxedType(getter.getType()))) {
//noinspection unchecked
return (RowGetter<T>)getter;
} else {
throw new ClassCastException("Getter for column " + name + " is not assignable from " + tClass + ", type is " + getter.getType());
}
}
/**
* Pre-compute the getters for all the columns. This implementation
* manufactures the data based on an index and the column type.
*
* @param tableDefinition the table definition to inspect
*/
private void init(TableDefinition tableDefinition) {
for (ColumnDefinition colDef : tableDefinition.getColumns()) {
String colName = colDef.getName();
if (DBDateTime.class.equals(colDef.getDataType())) {
rowGettersByName.put(colName,
new AbstractRowGetter<DBDateTime>(DBDateTime.class) {
@Override
public DBDateTime get() {
return DBDateTime.now();
}
});
continue;
}
try {
switch (SupportedType.getType(colDef.getDataType())) {
case Boolean: {
rowGettersByName.put(colName,
new AbstractRowGetter<Boolean>(Boolean.class) {
@Override
public Boolean get() {
return getBoolean();
}
@Override
public Boolean getBoolean() {
return index % 3 == 0;
}
});
break;
}
case Byte: {
rowGettersByName.put(colName,
new AbstractRowGetter<Byte>(Byte.class) {
@Override
public Byte get() {
return getByte();
}
@Override
public byte getByte() {
return (byte)(index % 128);
}
});
break;
}
case Char: {
rowGettersByName.put(colName,
new AbstractRowGetter<Character>(Character.class) {
@Override
public Character get() {
return getChar();
}
@Override
public char getChar() {
return (char)(index % 128);
}
});
break;
}
case Double: {
rowGettersByName.put(colName,
new AbstractRowGetter<Double>(Double.class) {
@Override
public Double get() {
return getDouble();
}
@Override
public double getDouble() {
return index + 0.123d;
}
});
break;
}
case Float: {
rowGettersByName.put(colName,
new AbstractRowGetter<Float>(Float.class) {
@Override
public Float get() {
return getFloat();
}
@Override
public float getFloat() {
return index + 0.456f;
}
});
break;
}
case Int: {
rowGettersByName.put(colName,
new AbstractRowGetter<Integer>(Integer.class) {
@Override
public Integer get() {
return getInt();
}
@Override
public int getInt() {
return index;
}
});
break;
}
case Long: {
rowGettersByName.put(colName,
new AbstractRowGetter<Long>(Long.class) {
@Override
public Long get() {
return getLong();
}
@Override
public long getLong() {
return index;
}
});
break;
}
case Short: {
rowGettersByName.put(colName,
new AbstractRowGetter<Short>(Short.class) {
@Override
public Short get() {
return getShort();
}
@Override
public short getShort() {
return (short)index;
}
});
break;
}
case String:
case EnhancedString:
case Enum: {
rowGettersByName.put(colName,
new AbstractRowGetter<String>(String.class) {
@Override
public String get() {
return "String: " + index;
}
});
break;
}
case Blob:
case UnknownType:
default:
{
rowGettersByName.put(colName,
new AbstractRowGetter<Object>(Object.class) {
@Override
public Object get() {
return null;
}
});
break;
}
}
} catch (UnsupportedOperationException e) {
// for the types not convertable in SupportedType
rowGettersByName.put(colName,
new AbstractRowGetter<Object>(Object.class) {
@Override
public Object get() {
return null;
}
});
}
}
}
}
/**
* Construct the ExampleImporter.
* <p>Can be private, because it's only constructed from this class's main().
* If queries are used to set up and run this importer, other access specifiers or constructors might be needed.
*
* @param log use this Logger for feedback
* @param arguments importer arguments defining the import run
*/
public ExampleImporter(@NotNull final Logger log, @NotNull final ExampleImporter.Arguments arguments) {
super(log, arguments);
}
/**
* Alternate constructor for usage without a command line.
*
* @param log Logger for monitoring and debugging messages
* @param namespace namespace of the destination table
* @param tableName name of the destination table
* @param partitionString internal and column partition values ("internalpartition/columnpartition")
* @param outputMode a valid {@link ImportOutputMode} value
* @param startIndex see the -st command line option: "Generate rows starting at startIndex (default is 0)"
* @param numRows see the -nr command line option: "Generate this many rows"
*/
public ExampleImporter(@NotNull final Logger log,
@NotNull final String namespace,
@NotNull final String tableName,
@NotNull final String partitionString,
@NotNull final String outputMode,
final int startIndex,
final int numRows) {
this(log, new Arguments(namespace, tableName, partitionString, outputMode, startIndex, numRows));
}
/**
* This will be called by the base importer superclass.
*
* Override this method to open and process your data source. For each row of the input,
* call tableWriter.getSetter(...).set(value) for each column, and then tableWriter.writeRow().
*/
@Override
protected void processData() {
if (getImporterArguments().simpleExample) {
processDataSimple();
} else {
processDataDynamic();
}
}
/**
* Pretend to read rows from a data source, actually manufacture data for a
* known static schema.
*/
private void processDataSimple() {
final SimpleExampleDataSource dataSource = new SimpleExampleDataSource(getImporterArguments().getStartIndexArgument(), getImporterArguments().getNumRowsArgument());
TableWriter tableWriter = getTableWriter();
// (Optional) do the getter lookups outside the loop, for efficiency. These could be stored in an array or other structure.
final RowSetter<String> setterStringCol = tableWriter.getSetter("StringCol", String.class);
final RowSetter<DBDateTime> setterDateTimeCol = tableWriter.getSetter("DateTimeCol", DBDateTime.class);
final RowSetter<Integer> setterIntegerCol = tableWriter.getSetter("IntegerCol", Integer.class);
final RowSetter<Long> setterLongCol = tableWriter.getSetter("LongCol", Long.class);
final RowSetter<Double> setterDoubleCol = tableWriter.getSetter("DoubleCol", Double.class);
final RowSetter<Float> setterFloatCol = tableWriter.getSetter("FloatCol", Float.class);
final RowSetter<Boolean> setterBooleanCol = tableWriter.getSetter("BooleanCol", Boolean.class);
final RowSetter<Character> setterCharCol = tableWriter.getSetter("CharCol", Character.class);
final RowSetter<Byte> setterByteCol = tableWriter.getSetter("ByteCol", Byte.class);
final RowSetter<Short> setterShortCol = tableWriter.getSetter("ShortCol", Short.class);
// Date is the partitioning column, so we cannot set it.
// For each row provided by your data source, set all the values in the writer.
while (dataSource.readRow()) {
setterStringCol.set(dataSource.getStringColValue());
setterDateTimeCol.set(dataSource.getDateTimeColValue());
setterIntegerCol.setInt(dataSource.getIntegerColValue());
setterLongCol.setLong(dataSource.getLongColValue());
setterDoubleCol.setDouble(dataSource.getDoubleColValue());
setterFloatCol.setFloat(dataSource.getFloatColValue());
setterBooleanCol.setBoolean(dataSource.getBooleanColValue());
setterCharCol.setChar(dataSource.getCharColValue());
setterByteCol.setByte(dataSource.getByteColValue());
setterShortCol.setShort(dataSource.getShortColValue());
// write each populated row
try {
tableWriter.writeRow();
} catch (IOException ioe) {
throw new UncheckedIOException("Failed to write a row!", ioe);
}
}
}
/**
* Pretend to read rows from a data source, actually manufacture data for
* the columns discovered in the named namespace/table.
*/
private void processDataDynamic() {
final int start = getImporterArguments().getStartIndexArgument();
final int numRows = getImporterArguments().getNumRowsArgument();
final TableDefinition tableDefinition = getTableDefinition().getWritable();
// this example data source knows how to manufacture data for the columns it discovers
final DynamicExampleDataSource dataSource = new DynamicExampleDataSource(start, numRows, tableDefinition);
final TableWriter tableWriter = getTableWriter();
// calculate the type-sensitive get-then-set lambda for all columns, outside the loop
List<Runnable> getAndSetters = new ArrayList<>();
for (ColumnDefinition columnDefinition : tableDefinition.getColumns()) {
String colName = columnDefinition.getName();
getAndSetters.add(getTypeSpecificGetSet(tableWriter.getSetter(colName), dataSource.getGetter(colName), columnDefinition.getDataType()));
}
// It would be simpler to call tableWriter.getSetter(colName).set(dataSource.getGetter(colName)), but using the
// generic get() causes primitive types to be boxed.
while (dataSource.readRow()) {
// For each row provided by your data source, set all the values in the writer.
getAndSetters.forEach(Runnable::run);
// write each populated row
try {
tableWriter.writeRow();
} catch (IOException ioe) {
throw new UncheckedIOException("Failed to write a row!", ioe);
}
}
}
/**
* We want to use the typed versions of get and set to avoid boxing.
* Calculate and return a runnable that uses the typed get and set methods
* to move the source input to the writer output.
*
* @param setter a RowSetter for a column
* @param getter a RowSetter for a column
* @param type the class of the column data
* @return a Runnable that gets and sets a column value
*/
private static Runnable getTypeSpecificGetSet(final RowSetter setter, final RowGetter getter, final Class<?> type) {
if (type == char.class || type == Character.class) {
return () -> setter.setChar(getter.getChar());
} else if (type == byte.class || type == Byte.class) {
return () -> setter.setByte(getter.getByte());
} else if (type == double.class || type == Double.class) {
return () -> setter.setDouble(getter.getDouble());
} else if (type == float.class || type == Float.class) {
return () -> setter.setFloat(getter.getFloat());
} else if (type == int.class || type == Integer.class) {
return () -> setter.setInt(getter.getInt());
} else if (type == long.class || type == Long.class) {
return () -> setter.setLong(getter.getLong());
} else if (type == short.class || type == Short.class) {
return () -> setter.setShort(getter.getShort());
} else {
//noinspection unchecked
return () -> setter.set(getter.get());
}
}
/**
* Static helper method to avoid fluff in queries.
*
* @param log Logger for monitoring and debugging messages
* @param namespace namespace of the destination table
* @param tableName name of the destination table
* @param partitionString internal and column partition values ("internalpartition/columnpartition")
* @param outputMode a valid {@link ImportOutputMode} value
* @param startIndex see the -st command line option: "Generate rows starting at startIndex (default is 0)"
* @param numRows see the -nr command line option: "Generate this many rows"
*/
@SuppressWarnings("unused")
public static void importData(@NotNull final Logger log,
@NotNull final String namespace,
@NotNull final String tableName,
@NotNull final String partitionString,
@NotNull final String outputMode,
final int startIndex,
final int numRows) {
ExampleImporter importer = new ExampleImporter(log, namespace, tableName, partitionString, outputMode, startIndex, numRows);
importer.doImport();
}
/**
* Example importer.
*
* @param args command line arguments to the process
*/
public static void main(final String... args) {
// get the Configuration singleton instance and save it for convenience
final Configuration configuration = Configuration.getInstance();
// Make sure only one instance of this process runs at any time.
// The process name is taken from the "process.name" property set in a property file or
// via jvm args such as -Dprocess.name=ExampleImporter_1
PidFileUtil.checkAndCreatePidFileForThisProcess(configuration);
// Create an Iris logger for this process
final Logger log = ProcessStreamLoggerImpl.makeLogger(new SystemLoggerTimeSource(), TimeZone.getDefault());
// Redirect any log4j logging to the Iris logger
Log4jAdapter.sendLog4jToLogger(log);
// if no arguments are given, print a friendly usage message (to the log file)
if (args.length == 0) {
log.info().append(StandardImporterArguments.getHelpString(ExampleImporter.class.getSimpleName(), ExampleImporter.Arguments::addOptions)).endl();
System.exit(-1);
}
try {
// All Iris processes expect a ProcessEnvironment.
ProcessEnvironment.basicServerInitialization(configuration, ExampleImporter.class.getSimpleName(), log);
// parse the arguments
ExampleImporter.Arguments importerArgs = StandardImporterArguments.parseCommandLine(ExampleImporter.class.getSimpleName(), ExampleImporter.Arguments::new, args, ExampleImporter.Arguments::addOptions);
try {
// Create an importer with the specified arguments, and import the data.
ExampleImporter importer = new ExampleImporter(log, importerArgs);
importer.doImport();
} catch (RuntimeException e){
log.fatal().append("Import failed with exception: ").append(e).endl();
}
} catch (Exception e) {
log.fatal().append("Failed to parse command line arguments: ").append(e).endl();
// add the usage statement
log.info().append(StandardImporterArguments.getHelpString(ExampleImporter.class.getSimpleName(), ExampleImporter.Arguments::addOptions)).endl();
System.exit(-1);
}
}
}
BaseImporter.java
Expand here to see the full code for BaseImporter.java
package com.illumon.iris.importers;
import com.fishlib.base.verify.Require;
import com.fishlib.io.logger.Logger;
import com.illumon.iris.binarystore.TableWriter;
import com.illumon.iris.db.tables.TableDefinition;
import com.illumon.iris.db.tables.dataimport.logtailer.TableListenerFactory;
import com.illumon.iris.db.tables.dataimport.logtailer.TableListenerFactoryImpl;
import com.illumon.iris.utils.ImportException;
import org.jetbrains.annotations.NotNull;
/**
* Abstract base class for importers. Handles most of the boilerplate.
* Implementing classes will need to implement {@link #processData()},
* and will most likely want to extend {@link StandardImporterArguments}.
* {@link #processData()} will acquire data from a source and feed it to the
* {@link TableWriter} via {@link TableWriter#getSetter(String) TableWriter.getSetter}.
* {@link com.illumon.iris.binarystore.RowSetter#set(Object) set()}
* and {@link TableWriter#writeRow()}.
*
* @param <IAT> The actual class that extends StandardImporterArguments
*/
public abstract class BaseImporter<IAT extends StandardImporterArguments> {
protected final Logger log;
private TableWriter tableWriter = null;
private TableDefinition tableDefinition = null;
private TableListenerFactory tableListenerFactory = null;
private final IAT arguments;
/**
* Construct the BaseImporter.
*
* @param log use this Logger for feedback
* @param arguments importer arguments defining the import run
*/
BaseImporter(@NotNull final Logger log, @NotNull final IAT arguments) {
this.log = Require.neqNull(log, "log");
this.arguments = Require.neqNull(arguments, "arguments");
}
/**
* Get the importer arguments, in the actual parameterized type.
*
* @return the parsed importer arguments
*/
@SuppressWarnings("WeakerAccess")
protected IAT getImporterArguments() {
return arguments;
}
/**
* Accessor for TableWriter.
*
* @return the TableWriter
*/
protected TableWriter getTableWriter() {
return tableWriter;
}
/**
* Accessor for table definition
* @return the TableDefinition
*/
protected TableDefinition getTableDefinition() { return tableDefinition; }
/**
* Accessor for TableListenerFactory.
* @return the TableListenerFactory
*/
@SuppressWarnings("WeakerAccess")
protected TableListenerFactory getTableListenerFactory() {
return tableListenerFactory;
}
/**
* Set up the import by validating arguments, creating the tableWriter, etc.
* Implementors should not need to override this.
*/
@SuppressWarnings("WeakerAccess")
void setUpImport() {
log.info().append("Beginning import for ").append(arguments).endl();
final ImportTableWriterFactory tableWriterFactory = arguments.getImportTableWriterFactory();
tableDefinition = tableWriterFactory.getTableDefinition();
// we do not support multi-partition import here yet
if( arguments.getPartitioningColumnName() != null) {
throw new IllegalArgumentException("Multi-partition import is not supported in this importer");
}
tableWriter = tableWriterFactory.getTableWriter(null);
tableListenerFactory = new TableListenerFactoryImpl(log, arguments.getNamespace(), arguments.getTableName(), tableWriter);
}
/**
* Import the data - e.g. read the rows from the data source.
* The Overriding class does the actual import using arguments, tableWriter, and tableListenerFactory
*/
protected abstract void processData();
/**
* Wrap up the import process. Close the tableWriter and any other finalization tasks.
* Implementors should not need to override this.
*/
@SuppressWarnings("WeakerAccess")
protected void finishImport() {
log.info().append("Closing table writer").endl();
try {
if (tableWriter != null) {
tableWriter.close();
}
} catch (Exception e) {
throw new ImportException("Failed to close writer", e);
}
log.info().append("Done importing").endl();
}
/**
* Execute the steps for an import to a specific location.
* Implementors should not need to override this.
*/
@SuppressWarnings("WeakerAccess")
protected void doImport() {
setUpImport();
processData();
finishImport();
}
}