package com.illumon.iris.importers;
import com.fishlib.base.log.LogOutput;
import com.fishlib.configuration.Configuration;
import com.fishlib.io.logger.Log4jAdapter;
import com.fishlib.io.logger.Logger;
import com.fishlib.io.logger.ProcessStreamLoggerImpl;
import com.fishlib.util.PidFileUtil;
import com.fishlib.util.process.ProcessEnvironment;
import com.illumon.dataobjects.ColumnDefinition;
import com.illumon.iris.binarystore.*;
import com.illumon.iris.db.tables.TableDefinition;
import com.illumon.iris.db.tables.dataimport.DbNamespace;
import com.illumon.iris.db.tables.utils.DBDateTime;
import com.illumon.iris.utils.SystemLoggerTimeSource;
import org.apache.commons.cli.*;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;
/**
* <p> This file is an example importer that illustrates the extensions to
* {@link BaseImporter} needed to create a custom importer.
*
* <p> There are two examples. Both manufacture data based on a row index.
*
* <p> {@link #processDataSimple()} will write data to a table defined by the following schema:
* <pre>{@code
* <Table name="DemoTable" namespace="ExampleNamespace" rethrowLoggerExceptionsAsIOExceptions="false" storageType="NestedPartitionedOnDisk">
* <Partitions keyFormula="${autobalance_by_first_grouping_column}" />
*
* <Column name="StringCol" dataType="String" />
* <Column name="DateTimeCol" dataType="DateTime" />
* <Column name="IntegerCol" dataType="Integer" />
* <Column name="LongCol" dataType="Long" />
* <Column name="DoubleCol" dataType="Double" />
* <Column name="FloatCol" dataType="Float" />
* <Column name="BooleanCol" dataType="Boolean" />
* <Column name="CharCol" dataType="Char" />
* <Column name="ByteCol" dataType="Byte" />
* <Column name="ShortCol" dataType="Short" />
* <Column name="Date" dataType="String" columnType="Partitioning" />
* </Table>
* }</pre>
*
* <p> The getters and setters are hard-coded and stored in variables. This method
* is simple, but tedious.
*
* <p> A valid invocation will include options similar to the following
* (note that the namespace and table name parameters match the example schema above:
* <p><code> -dp localhost/2018-02-28 -ns ExampleNamespace -tn DemoTable -om REPLACE -nd 0 -st 100 -nr 123 </code>
*
* <p> {@link #processDataDynamic()} will write data to any table with an existing
* schema.
* The code to get the data is a calculation based on the index and data
* type. There are several layers of code to move calculations out of the row
* processing loop, and to ensure that unnecessary inefficiency (e.g. boxing of
* primitives) is avoided.
* <p> A valid invocation will include options similar to this:
* <p><code> -dp localhost/2018-02-28 -ns AnyNamespace -tn AnyTable -om REPLACE -nd 0 -st 100 -nr 123 </code>
*/
public class ExampleImporter extends BaseImporter<ExampleImporter.Arguments> {
/**
* Define this inner class if you need to add arguments to StandardImporterArguments.
* Define the new options in {@link #addOptions} and handle them in the constructor.
*/
static class Arguments extends StandardImporterArguments {
// constants for the custom argument strings
private static final String WELCOME_MESSAGE_OPTION = "wm";
private static final String START_OPTION = "st";
private static final String NUM_ROWS_OPTION = "nr";
private static final String SIMPLE_OPTION = "simple";
// hold onto parsed values
private final int startIndex;
private final int numRows;
private final String message;
private final boolean simpleExample;
/**
* Construct Arguments from a command line.
*
* @param commandLine the command line, as passed from {@link ExampleImporter#main}.
* @throws ParseException if the command line or parameters are not valid
*/
Arguments(@NotNull CommandLine commandLine) throws ParseException {
super(commandLine);
// validate and handle custom options here
// Example usage of an optional String argument
if (commandLine.hasOption(WELCOME_MESSAGE_OPTION)) {
message = commandLine.getOptionValue(WELCOME_MESSAGE_OPTION);
} else {
message = null;
}
try {
// optional Number argument
if (commandLine.hasOption(START_OPTION)) {
startIndex = ((Number)commandLine.getParsedOptionValue(START_OPTION)).intValue();
} else {
startIndex = 0;
}
// required Number argument
numRows = ((Number)commandLine.getParsedOptionValue(NUM_ROWS_OPTION)).intValue();
} catch (ParseException pe) {
// The option could not be parsed into the declared type.
// You don't really want to catch and immediately rethrow - handle the exception here, or remove the catch.
throw pe;
}
simpleExample = commandLine.hasOption(SIMPLE_OPTION);
}
/**
* Alternate constructor for usage that doesn't start at main (with a command line).
*
* @param namespace namespace of the destination table
* @param tableName name of the destination table
* @param partitionString internal and column partition values ("internalpartition/columnpartition")
* @param outputMode a valid {@link ImportOutputMode} value
* @param startIndex see the -st command line option: "Generate rows starting at startIndex (default is 0)"
* @param numRows see the -nr command line option: "Generate this many rows"
*/
Arguments(// StandardImporterArguments values
@NotNull final String namespace,
@NotNull final String tableName,
@NotNull final String partitionString,
@NotNull final String outputMode,
// ExampleImporter.Arguments values
final int startIndex,
final int numRows) {
super(DbNamespace.valueOf(namespace), tableName, partitionString, null, null, ImportOutputMode.valueOf(outputMode));
this.startIndex = startIndex;
this.numRows = numRows;
this.simpleExample = false;
this.message = null;
}
/**
* Alter the standard arguments passed in by adding options specific to this importer.
* See {@link Options}, {@link Option} and {@link Option.Builder}.
*
* @param options addOptions should add new options to this set of Options.
* @return an Options with custom modifications to the input options
*/
private static Options addOptions(@NotNull final Options options) {
// the following option produces this usage string:
// -wm,--welcomeMessage <message> Example string argument
Option wmOption =
Option.builder(WELCOME_MESSAGE_OPTION) // short name of option, e.g. -wm vs --welcomeMessage
.longOpt("welcomeMessage") // long name of option, e.g. --welcomeMessage vs -wm
.desc("Example string argument") // option description, used in usage message
.hasArg() // include this if the option has an argument
.argName("message") // name of the argument value
.type(String.class) // argument type, used if getParsedOptionValue is used
.required(false) // makes this option optional
.build();
// An OptionGroup indicates mutually exclusive options
final OptionGroup exclusiveGroup = new OptionGroup();
exclusiveGroup.setRequired(true);
exclusiveGroup.addOption(
// you can declare options inline
Option.builder("nd")
.longOpt("numDogs")
.desc("Number of dogs (excludes cats)")
.hasArg()
.argName("number of dogs")
.type(Number.class)
.build())
// you can chain the calls together
.addOption(Option.builder("nc").longOpt("numCats").desc("Number of cats (excludes dogs)").hasArg().argName("number of cats").type(Number.class).build());
// Add Options and OptionGroups the same way
return options
.addOption(wmOption)
.addOptionGroup(exclusiveGroup)
.addOption(Option.builder(SIMPLE_OPTION).required(false).longOpt("simple").desc("Use SimpleExampleDataSource").build())
.addOption(Option.builder(NUM_ROWS_OPTION).required().longOpt("numRows").desc("Generate this many rows").hasArg().argName("numRows").type(Number.class).build())
.addOption(Option.builder(START_OPTION).required(false).longOpt("startIndex").desc("Generate rows starting at startIndex (default is 0)").hasArg().argName("startIndex").type(Number.class).build());
}
/**
* Sub-classes should define their own version of appendArguments, which
* should almost always invoke the superclass implementation first.
* The superclass, {@link StandardImporterArguments}, includes
* namespace, tableName, destinationDirectory, and outputMode.
*
* @param logOutput Append argument names and values to this LogOutput
* @return The logOutput for call chaining
*/
public LogOutput appendArguments(@NotNull final LogOutput logOutput) {
super.appendArguments(logOutput);
// For example,
//return logOutput
// .append(",destinationDirectory=").append(destinationDirectory.toString())
// .append(",outputMode=").append(outputMode);
return logOutput;
}
/**
* Accessor for customized option startIndex.
*
* @return The specified or default start index.
*/
int getStartIndexArgument() {
return startIndex;
}
/**
* Accessor for customized option numRows.
*
* @return The specified number of rows.
*/
int getNumRowsArgument() {
return numRows;
}
/**
* Accessor for customized string argument.
*
* @return The welcome message.
*/
String getMessage() {
return message;
}
/**
* You can certainly add methods to find out whether optional arguments were specified.
*
* @return true if a message was given on the command line
*/
boolean hasMessageArgument() {
return message != null;
}
}
/**
* <p>Example data source. This may implement {@link TableReader}, but does not have to.
* This class needs to read your data source and supply the rows to ExampleImporter.processData.
* You will likely need to add custom arguments to identify the data source.
*
* <p>This implementation produces manufactured data based on an index in a
* range given in the constructor.
*/
private static class SimpleExampleDataSource {
private final int start;
private final int numRows;
private int index;
SimpleExampleDataSource(int start, int numRows) {
this.start = start;
this.numRows = numRows > 0 ? numRows : 0;
index = start-1;
}
/**
* Read or otherwise prepare the next row of data.
* This implementation manufactures the data.
*
* @return true if another row is available, false if there is no more data.
*/
boolean readRow() {
if (index < start + numRows - 1) {
index = index +1;
return true;
}
return false;
}
// This simple implementation hard-codes accessors for the known columns.
String getStringColValue() {
return "string:" + index;
}
DBDateTime getDateTimeColValue() {
return DBDateTime.now();
}
long getLongColValue() {
return index;
}
int getIntegerColValue() {
return index;
}
double getDoubleColValue() {
return 0.123d + index;
}
float getFloatColValue() {
return 0.123f + index;
}
boolean getBooleanColValue() {
return index % 3 == 0;
}
char getCharColValue() {
return (char)index;
}
byte getByteColValue() {
return (byte)(index%128);
}
short getShortColValue() {
return (short)index;
}
}
/**
* <p> Example data source. This may implement {@link TableReader}, but does not have to.
* This class needs to read your data source and supply the rows to ExampleImporter.processData.
* You will likely need to add custom arguments to identify the data source.
*
* <p> This implementation dynamically discovers the columns in the table
* definition, and produces manufactured data based on an index in a
* range given in the constructor.
*/
private static class DynamicExampleDataSource {
private final int start;
private final int numRows;
private int index;
/** Compute the getter methods once and store them. */
final Map<String, RowGetter> rowGettersByName = new HashMap<>();
DynamicExampleDataSource(int start, int numRows, TableDefinition tableDefinition) {
this.start = start;
this.numRows = numRows > 0 ? numRows : 0;
index = start-1;
init(tableDefinition);
}
/**
* Read or otherwise prepare the next row of data.
* This implementation manufactures the data.
*
* @return true if another row is available, false if there is no more data.
*/
boolean readRow() {
if (index < start + numRows - 1) {
index = index +1;
return true;
}
return false;
}
/**
* Get the getter for a named column.
* The getter is a pre-calculated implementation of RowGetter.
* This variant cannot return a typed RowGetter.
*
* @param name the column to get the getter for.
* @return the getter
* @throws IllegalArgumentException when the getter does not exist
*/
RowGetter getGetter(String name) {
RowGetter rowGetter = rowGettersByName.get(name);
if (rowGetter == null) {
throw new IllegalArgumentException("No getter for column " + name);
}
return rowGetter;
}
/**
* Get the typed getter for a named column.
* The getter is a pre-calculated implementation of RowGetter.
*
* @param name the column to get the getter for.
* @return the getter
* @throws IllegalArgumentException when the getter does not exist
* @throws ClassCastException when the type is inappropriate
*/
public <T> RowGetter<T> getGetter(String name, Class<T> tClass) {
RowGetter getter = getGetter(name);
if (tClass.isAssignableFrom(getter.getType())) {
//noinspection unchecked
return (RowGetter<T>) getter;
} else if (tClass.isAssignableFrom(com.illumon.util.type.TypeUtils.getBoxedType(getter.getType()))) {
//noinspection unchecked
return (RowGetter<T>)getter;
} else {
throw new ClassCastException("Getter for column " + name + " is not assignable from " + tClass + ", type is " + getter.getType());
}
}
/**
* Pre-compute the getters for all the columns. This implementation
* manufactures the data based on an index and the column type.
*
* @param tableDefinition the table definition to inspect
*/
private void init(TableDefinition tableDefinition) {
for (ColumnDefinition colDef : tableDefinition.getColumns()) {
String colName = colDef.getName();
if (DBDateTime.class.equals(colDef.getDataType())) {
rowGettersByName.put(colName,
new AbstractRowGetter<DBDateTime>(DBDateTime.class) {
@Override
public DBDateTime get() {
return DBDateTime.now();
}
});
continue;
}
try {
switch (SupportedType.getType(colDef.getDataType())) {
case Boolean: {
rowGettersByName.put(colName,
new AbstractRowGetter<Boolean>(Boolean.class) {
@Override
public Boolean get() {
return getBoolean();
}
@Override
public Boolean getBoolean() {
return index % 3 == 0;
}
});
break;
}
case Byte: {
rowGettersByName.put(colName,
new AbstractRowGetter<Byte>(Byte.class) {
@Override
public Byte get() {
return getByte();
}
@Override
public byte getByte() {
return (byte)(index % 128);
}
});
break;
}
case Char: {
rowGettersByName.put(colName,
new AbstractRowGetter<Character>(Character.class) {
@Override
public Character get() {
return getChar();
}
@Override
public char getChar() {
return (char)(index % 128);
}
});
break;
}
case Double: {
rowGettersByName.put(colName,
new AbstractRowGetter<Double>(Double.class) {
@Override
public Double get() {
return getDouble();
}
@Override
public double getDouble() {
return index + 0.123d;
}
});
break;
}
case Float: {
rowGettersByName.put(colName,
new AbstractRowGetter<Float>(Float.class) {
@Override
public Float get() {
return getFloat();
}
@Override
public float getFloat() {
return index + 0.456f;
}
});
break;
}
case Int: {
rowGettersByName.put(colName,
new AbstractRowGetter<Integer>(Integer.class) {
@Override
public Integer get() {
return getInt();
}
@Override
public int getInt() {
return index;
}
});
break;
}
case Long: {
rowGettersByName.put(colName,
new AbstractRowGetter<Long>(Long.class) {
@Override
public Long get() {
return getLong();
}
@Override
public long getLong() {
return index;
}
});
break;
}
case Short: {
rowGettersByName.put(colName,
new AbstractRowGetter<Short>(Short.class) {
@Override
public Short get() {
return getShort();
}
@Override
public short getShort() {
return (short)index;
}
});
break;
}
case String:
case EnhancedString:
case Enum: {
rowGettersByName.put(colName,
new AbstractRowGetter<String>(String.class) {
@Override
public String get() {
return "String: " + index;
}
});
break;
}
case Blob:
case UnknownType:
default:
{
rowGettersByName.put(colName,
new AbstractRowGetter<Object>(Object.class) {
@Override
public Object get() {
return null;
}
});
break;
}
}
} catch (UnsupportedOperationException e) {
// for the types not convertable in SupportedType
rowGettersByName.put(colName,
new AbstractRowGetter<Object>(Object.class) {
@Override
public Object get() {
return null;
}
});
}
}
}
}
/**
* Construct the ExampleImporter.
* <p>Can be private, because it's only constructed from this class's main().
* If queries are used to set up and run this importer, other access specifiers or constructors might be needed.
*
* @param log use this Logger for feedback
* @param arguments importer arguments defining the import run
*/
public ExampleImporter(@NotNull final Logger log, @NotNull final ExampleImporter.Arguments arguments) {
super(log, arguments);
}
/**
* Alternate constructor for usage without a command line.
*
* @param log Logger for monitoring and debugging messages
* @param namespace namespace of the destination table
* @param tableName name of the destination table
* @param partitionString internal and column partition values ("internalpartition/columnpartition")
* @param outputMode a valid {@link ImportOutputMode} value
* @param startIndex see the -st command line option: "Generate rows starting at startIndex (default is 0)"
* @param numRows see the -nr command line option: "Generate this many rows"
*/
public ExampleImporter(@NotNull final Logger log,
@NotNull final String namespace,
@NotNull final String tableName,
@NotNull final String partitionString,
@NotNull final String outputMode,
final int startIndex,
final int numRows) {
this(log, new Arguments(namespace, tableName, partitionString, outputMode, startIndex, numRows));
}
/**
* This will be called by the base importer superclass.
*
* Override this method to open and process your data source. For each row of the input,
* call tableWriter.getSetter(...).set(value) for each column, and then tableWriter.writeRow().
*/
@Override
protected void processData() {
if (getImporterArguments().simpleExample) {
processDataSimple();
} else {
processDataDynamic();
}
}
/**
* Pretend to read rows from a data source, actually manufacture data for a
* known static schema.
*/
private void processDataSimple() {
final SimpleExampleDataSource dataSource = new SimpleExampleDataSource(getImporterArguments().getStartIndexArgument(), getImporterArguments().getNumRowsArgument());
TableWriter tableWriter = getTableWriter();
// (Optional) do the getter lookups outside the loop, for efficiency. These could be stored in an array or other structure.
final RowSetter<String> setterStringCol = tableWriter.getSetter("StringCol", String.class);
final RowSetter<DBDateTime> setterDateTimeCol = tableWriter.getSetter("DateTimeCol", DBDateTime.class);
final RowSetter<Integer> setterIntegerCol = tableWriter.getSetter("IntegerCol", Integer.class);
final RowSetter<Long> setterLongCol = tableWriter.getSetter("LongCol", Long.class);
final RowSetter<Double> setterDoubleCol = tableWriter.getSetter("DoubleCol", Double.class);
final RowSetter<Float> setterFloatCol = tableWriter.getSetter("FloatCol", Float.class);
final RowSetter<Boolean> setterBooleanCol = tableWriter.getSetter("BooleanCol", Boolean.class);
final RowSetter<Character> setterCharCol = tableWriter.getSetter("CharCol", Character.class);
final RowSetter<Byte> setterByteCol = tableWriter.getSetter("ByteCol", Byte.class);
final RowSetter<Short> setterShortCol = tableWriter.getSetter("ShortCol", Short.class);
// Date is the partitioning column, so we cannot set it.
// For each row provided by your data source, set all the values in the writer.
while (dataSource.readRow()) {
setterStringCol.set(dataSource.getStringColValue());
setterDateTimeCol.set(dataSource.getDateTimeColValue());
setterIntegerCol.setInt(dataSource.getIntegerColValue());
setterLongCol.setLong(dataSource.getLongColValue());
setterDoubleCol.setDouble(dataSource.getDoubleColValue());
setterFloatCol.setFloat(dataSource.getFloatColValue());
setterBooleanCol.setBoolean(dataSource.getBooleanColValue());
setterCharCol.setChar(dataSource.getCharColValue());
setterByteCol.setByte(dataSource.getByteColValue());
setterShortCol.setShort(dataSource.getShortColValue());
// write each populated row
try {
tableWriter.writeRow();
} catch (IOException ioe) {
throw new UncheckedIOException("Failed to write a row!", ioe);
}
}
}
/**
* Pretend to read rows from a data source, actually manufacture data for
* the columns discovered in the named namespace/table.
*/
private void processDataDynamic() {
final int start = getImporterArguments().getStartIndexArgument();
final int numRows = getImporterArguments().getNumRowsArgument();
final TableDefinition tableDefinition = getTableDefinition().getWritable();
// this example data source knows how to manufacture data for the columns it discovers
final DynamicExampleDataSource dataSource = new DynamicExampleDataSource(start, numRows, tableDefinition);
final TableWriter tableWriter = getTableWriter();
// calculate the type-sensitive get-then-set lambda for all columns, outside the loop
List<Runnable> getAndSetters = new ArrayList<>();
for (ColumnDefinition columnDefinition : tableDefinition.getColumns()) {
String colName = columnDefinition.getName();
getAndSetters.add(getTypeSpecificGetSet(tableWriter.getSetter(colName), dataSource.getGetter(colName), columnDefinition.getDataType()));
}
// It would be simpler to call tableWriter.getSetter(colName).set(dataSource.getGetter(colName)), but using the
// generic get() causes primitive types to be boxed.
while (dataSource.readRow()) {
// For each row provided by your data source, set all the values in the writer.
getAndSetters.forEach(Runnable::run);
// write each populated row
try {
tableWriter.writeRow();
} catch (IOException ioe) {
throw new UncheckedIOException("Failed to write a row!", ioe);
}
}
}
/**
* We want to use the typed versions of get and set to avoid boxing.
* Calculate and return a runnable that uses the typed get and set methods
* to move the source input to the writer output.
*
* @param setter a RowSetter for a column
* @param getter a RowSetter for a column
* @param type the class of the column data
* @return a Runnable that gets and sets a column value
*/
private static Runnable getTypeSpecificGetSet(final RowSetter setter, final RowGetter getter, final Class<?> type) {
if (type == char.class || type == Character.class) {
return () -> setter.setChar(getter.getChar());
} else if (type == byte.class || type == Byte.class) {
return () -> setter.setByte(getter.getByte());
} else if (type == double.class || type == Double.class) {
return () -> setter.setDouble(getter.getDouble());
} else if (type == float.class || type == Float.class) {
return () -> setter.setFloat(getter.getFloat());
} else if (type == int.class || type == Integer.class) {
return () -> setter.setInt(getter.getInt());
} else if (type == long.class || type == Long.class) {
return () -> setter.setLong(getter.getLong());
} else if (type == short.class || type == Short.class) {
return () -> setter.setShort(getter.getShort());
} else {
//noinspection unchecked
return () -> setter.set(getter.get());
}
}
/**
* Static helper method to avoid fluff in queries.
*
* @param log Logger for monitoring and debugging messages
* @param namespace namespace of the destination table
* @param tableName name of the destination table
* @param partitionString internal and column partition values ("internalpartition/columnpartition")
* @param outputMode a valid {@link ImportOutputMode} value
* @param startIndex see the -st command line option: "Generate rows starting at startIndex (default is 0)"
* @param numRows see the -nr command line option: "Generate this many rows"
*/
@SuppressWarnings("unused")
public static void importData(@NotNull final Logger log,
@NotNull final String namespace,
@NotNull final String tableName,
@NotNull final String partitionString,
@NotNull final String outputMode,
final int startIndex,
final int numRows) {
ExampleImporter importer = new ExampleImporter(log, namespace, tableName, partitionString, outputMode, startIndex, numRows);
importer.doImport();
}
/**
* Example importer.
*
* @param args command line arguments to the process
*/
public static void main(final String... args) {
// get the Configuration singleton instance and save it for convenience
final Configuration configuration = Configuration.getInstance();
// Make sure only one instance of this process runs at any time.
// The process name is taken from the "process.name" property set in a property file or
// via jvm args such as -Dprocess.name=ExampleImporter_1
PidFileUtil.checkAndCreatePidFileForThisProcess(configuration);
// Create an Iris logger for this process
final Logger log = ProcessStreamLoggerImpl.makeLogger(new SystemLoggerTimeSource(), TimeZone.getDefault());
// Redirect any log4j logging to the Iris logger
Log4jAdapter.sendLog4jToLogger(log);
// if no arguments are given, print a friendly usage message (to the log file)
if (args.length == 0) {
log.info().append(StandardImporterArguments.getHelpString(ExampleImporter.class.getSimpleName(), ExampleImporter.Arguments::addOptions)).endl();
System.exit(-1);
}
try {
// All Iris processes expect a ProcessEnvironment.
ProcessEnvironment.basicServerInitialization(configuration, ExampleImporter.class.getSimpleName(), log);
// parse the arguments
ExampleImporter.Arguments importerArgs = StandardImporterArguments.parseCommandLine(ExampleImporter.class.getSimpleName(), ExampleImporter.Arguments::new, args, ExampleImporter.Arguments::addOptions);
try {
// Create an importer with the specified arguments, and import the data.
ExampleImporter importer = new ExampleImporter(log, importerArgs);
importer.doImport();
} catch (RuntimeException e){
log.fatal().append("Import failed with exception: ").append(e).endl();
}
} catch (Exception e) {
log.fatal().append("Failed to parse command line arguments: ").append(e).endl();
// add the usage statement
log.info().append(StandardImporterArguments.getHelpString(ExampleImporter.class.getSimpleName(), ExampleImporter.Arguments::addOptions)).endl();
System.exit(-1);
}
}
}