Solace integration plugin
The Deephaven engine can directly consume data from a Solace™ PubSub+ Mesh. By doing so, you can improve latency and throughput relative to other solutions such as having an external logger write data to a binary log file. To enable this integration, you need to install the Solace integration plugin and configure where Deephaven should save your Solace data on disk.
The Solace integration plugin exposes new classes that can be invoked from within Persistent Queries. The only system-level configuration required is specifying where to save your data. The details of how to connect to Solace, what data you'll retrieve, and how to parse that data into the columns of your Deephaven table are all set up within your Persistent Query. You can use all Solace options without requiring your Deephaven administrator to pre-configure them.
The two primary options offered by the Solace integration plugin are Queue-based ingestion and Topic-based ingestion. These have different powerful uses, depending on what kind of data you're interested in.
Queue-based ingestion is best when you need guaranteed, ordered delivery of all data. A Solace Queue can be set up to persist data until it's consumed, so if your Deephaven system starts later in the day, it can still retrieve earlier data. For example, if you had an inventory management system based on Deephaven, you would want to know all of your transactions for the day so as to be able to calculate your current holdings. Queue-based ingestion can be replicated across multiple servers and works with redundant delivery for high availability.
Topic-based ingestion is best for real-time, best-effort data delivery. Non-persistent Solace Topics deliver data in real-time but don't retain it for later retrieval. Data published to a Solace Topic while Deephaven is offline will be lost. For example, if you had an inventory management system based on Deephaven, you might want to know the current market price of each asset in your system, but not need anything other than the current price. Topic-based ingestion is single-server (since different servers cannot be sure they started by receiving the same message) and is best for transient data.
The Solace Integration Plugin is an optional component of the Deephaven system. Rather than encapsulate the Solace libraries, these classes take in Solace objects that may be configured via Solace's own standard mechanisms.
Note
The Solace Integration Plugin runs in a Legacy Persistent Query, but since it creates a Data Import Server, the resulting data can be accessed from either a Core+ or Legacy worker.
Setup
Plugin installation
You can install the Solace Integration Plugin using either RPM or tar installation. Both methods may require sudo access.
RPM:
dnf install solace-plugin-*
sudo /etc/sysconfig/deephaven/plugins/solace-plugin/bin/activate.sh --classpath
Note: solace-plugin-*
is the plugin RPM file.
Tar:
- Copy the .tgz file to
/tmp/solace-plugin/
rm -rf /etc/sysconfig/deephaven/plugins/solace-plugin
mkdir -p /etc/sysconfig/deephaven/plugins/solace-plugin
tar -xvf /tmp/solace-plugin/Solace-plugin-1.20200331.150_1.0-Manual.tgz -C /etc/sysconfig/deephaven/plugins/solace-plugin
/etc/sysconfig/deephaven/plugins/solace-plugin/bin/reinstall.sh
Caution
A Solace Queue limits the number of unacknowledged messages delivered to a consumer at one time. The Solace Integration Plugin will consume messages as fast as it can from the Queue, and acknowledges those messages once they have been checkpointed (written to disk). The default checkpoint timeout is 30000 ms (30 seconds). You will likely want to set the value of the configuration property DataImportStreamProcessor.<your namespace>.<your table>.checkpointIntervalMillis
to a smaller value, such as 5000 ms.
Solace libraries installation
Note
This section references packages provided by Solace, and may be subject to change if Solace changes how they deliver or structure their API packages.
The SolaceQueueIngester
relies on various classes from Solace. These classes are contained in the Java™ JCSMP libraries published by Solace, presently available at the Solace download page.
The jar files are provided in a zip file from that location. Copy the zip file to the server where you will run the SolaceQueueIngester
, then extract it. The zip file contains a 'lib' directory. Copy all .jar files from the lib directory to: /etc/sysconfig/deephaven/illumon.d.latest/java_lib
SolaceIngesters
SolaceIngesters are classes available from the Solace Integration Plugin. Clients using the Solace™ PubSub+ Mesh can use these classes to import data directly from Solace into a Deephaven instance. The messages being delivered by Solace must be in a form that can be converted into Deephaven data rows. The Solace Integration Plugin includes pre-built adapters for raw text and for JSON, but other adapters can be created as needed.
SolaceQueueIngester
The SolaceQueueIngester
is a SolaceIngester
that can connect a Solace Queue directly to Deephaven. This is useful when guaranteed delivery or high availability is required, since Queues ensure that multiple import servers receive identical data.
To use the SolaceQueueIngester
, you need:
- A pre-existing Solace Queue.
- A Deephaven table to populate with Solace data.
- A mechanism to publish messages from a data source to the Solace Queue.
In addition, all messages must have a unique, monotonically increasing ID value included in their message header properties; the name of the property to use is configurable, but must be consistent across all messages.
The SolaceQueueIngester
takes in references to a Solace Queue, a Deephaven Data Import Server, and an adapter to convert messages from Solace into rows of Deephaven data. Under standard operation, a SolaceQueueIngester
will run inside a Deephaven Persistent Query.
SolaceTopicIngester
The SolaceTopicIngester
is a SolaceIngester
that can connect a Solace Topic directly to Deephaven. This is useful when you only need real-time data and don't require historical replay - capturing only the data received while the system is running.
To use the SolaceTopicIngester
, you need:
- An accessible Solace Topic (persistent or temporary).
- A Deephaven table to populate with Solace data.
- A mechanism to publish messages from a data source to the Solace Topic.
Unlike a SolaceQueueIngester
, no unique message ID is required; messages will be handled by the SolaceTopicIngester
in the order they are received. Note that this means that replication is not possible with a SolaceTopicIngester
; two separate SolaceTopicIngester
instances might receive different messages if they do not start at the same instant.
The SolaceTopicIngester
takes in references to a Solace Topic, a Deephaven Data Import Server, and an adapter to convert messages from Solace into rows of Deephaven data. Under standard operation, a SolaceTopicIngester
will run inside a Deephaven Persistent Query.
Data Import Server Configuration
SolaceIngesters
write directly to a Deephaven Data Import Server (DIS), which must be configured to write data out appropriately.
Directory setup
Each DIS instance must have exclusive control over its write directories. If data routing is configured so that two Data Import Servers have disjoint partition sets, they can share storage. However, for simplicity, Deephaven recommends that each DIS use a unique directory with a path like: /db/dataImportServers/[DIS_Name]
For example, to create a directory for SolaceImport1:
sudo mkdir -p /db/dataImportServers/SolaceImport1
sudo chown dbquery.dbmergegrp /db/dataImportServers/SolaceImport1
The storage directory must exist before starting a Solace ingestion DIS. When running in a regular query or console, the dbquery user needs read and write access to this path. When running as a persistent query against a merge server (preferred for production), the dbmerge account needs read and write privileges.
Routing setup
The DIS uses a YAML file to determine how to find data sources. Please see Routing for Deephaven Ingesters for information on how to configure routing for a SolaceQueueIngester
.
SolaceIngester configurable properties
Property | Description |
---|---|
SolaceIngester.reportIntervalMs | Interval in milliseconds between logged reports of message processing counts. Default: 60000 (one minute). |
SolaceIngester.cleanupIntervalMillis | Interval in milliseconds between adapter cleanup operations (such as delayed message acknowledgements). Ensures routine cleanup in low-throughput situations without long waits. Default: 100. |
SolaceIngester.cleanupMsgInterval | Number of messages received between adapter cleanup operations (such as delayed acknowledgements). Prevents cleanup message accumulation in high-throughput situations. Default: 10000. |
SolaceIngester.queueTimeoutMs | Timeout in milliseconds before the ingester determines no messages are waiting and performs immediate cleanup on pending messages. Default: 50. |
SolaceIngester.<ingestername>.internalPartition | Internal partition for this ingester to write to. Each ingester (including Kafka ingesters) should use a different internal partition. If not specified, defaults to the system hostname. |
Related configuration properties
Property | Description |
---|---|
DataImportStreamProcessor.[namespace].[table].checkpointIntervalMillis | This property significantly impacts SolaceQueueIngester performance. Solace Queues and Topics limit unacknowledged message delivery to receivers (currently maximum 1,000,000 unacknowledged messages). The SolaceTopicIngester automatically acknowledges all inbound messages once they've been read, as redelivery is never expected in this case. However, the SolaceQueueIngester acknowledges messages when they have been checkpointed (i.e., written recoverably to disk). When using the SolaceQueueIngester, you must set your checkpoint interval to an interval such that (desired messages per second) * (checkpoint interval) < (maximum unacknowledged messages) . Deephaven does not recommend setting this value below 5000 in most cases. |
DataImportStreamProcessor.[Namespace].[Table].flushIntervalMillis | This DIS property controls how often data is pushed to Deephaven clients. If this value is set to 1000, clients will see their screen update each second. Even if the ingester processes an update in microseconds, users may not see the data for up to a full second. In most configurations, this is a reasonable balance between latency and the speed of human perception. If a lower latency is required, you may wish to tune this value for the specific table being imported into. Setting this value too low may consume enough CPU to block other import operations (threshold varies by configuration). In most cases, the default value for this property will be reasonable. |
Solace-to-Deephaven adapters
The SolaceIngester is format-agnostic for Solace data. An Adapter converts the String representation of Solace messages into Deephaven rows. A SolaceIngester requires a BytesXMLMessageToTableWriterAdapter - a class that processes Solace BytesXMLMessage objects and persists them into Deephaven tables. This class is included in the SolaceIngester plugin.
BytesXMLMessageToTableWriterAdapter
Classes implementing this interface extract text payloads and supplemental information (such as unique message header IDs) from Solace's BytesXMLMessage objects. Create these interfaces using Builder objects, which define the actual Adapters. Builder methods return the original Builder object with the specified configuration, enabling method chaining.
Don't call the BytesXMLMessageToStringAdapter.Builder
class directly. Instead, use the Builder classes of the underlying StringToTableWriterAdapter implementations.
Shared Builder methods
Method | Description |
---|---|
messageIdColumnName(column) | If set, the specified Deephaven column will be populated with the unique message ID specified in the message header properties. If not set, the unique message ID will not be persisted. |
timestampColumnName(column) | If set, the specified Deephaven column will be populated with a timestamp indicating the wall-clock time when the message was consumed by the adapter. If not set, no timestamp will be persisted. |
solaceSendTimestampColumnName(column) | If set, the specified Deephaven column will be populated with the timestamp included in the BytesXMLMessage as the time when the message was sent, or null if the sender did not include a timestamp. If not set, sender timestamp information will not be persisted. |
solaceReceiveTimestampColumnName(column) | If set, the specified Deephaven column will be populated with the timestamp included in the BytesXMLMessage as the time when the message was received, or null if this was not enabled for this message. If not set, receive timestamp information will not be persisted. |
StringToTableWriterAdapter
Classes implementing this interface encapsulate the transformation of string data into Deephaven records.
JSONToTableWriterAdapter
This adapter processes text as JSON, converting each JSON entry into one Deephaven record. Column definitions and translations are defined at runtime.
The following properties are configurable.
Property | Description |
---|---|
JSONToTableWriterAdapter.consumerThreads - The number of threads the JSON adapter will use to process messages. Since JSON parsing is slower than Deephaven's disk write speed, the JSON adapter uses threading to parse multiple messages simultaneously for disk writing. This is especially beneficial for large JSON objects (over 50 fields). Don't exceed available CPU cores or performance will degrade. In low-volume scenarios (under 1000 messages / second), one thread should be sufficient. Excess threads may reduce performance. Default 1 (single-threaded). | |
JSONToTableWriterAdapter.consumerWaitInterval | How long (in milliseconds) each adapter thread should wait to get a new JSON message when that thread is ready to receive messages. Default 100. |
JSONToTableWriterAdapter.consumerReportInterval | How long (in milliseconds) the JSON adapter should wait to report performance information on how many messages it processed over this timeframe. Default 60000 (one minute). |
JSONToTableWriterAdapter.Builder
The following additional methods are available for this extension of Builder.
Method | Description |
---|---|
addColumnToFieldMapping(column, field) | Use this method to specify a direct one-to-one mapping between a JSON field and a Deephaven column. |
autoValueMapping(boolean) | When set to true , any Deephaven columns that are not specifically mapped otherwise will be assumed to be mapped to a JSON field of the same name. When false , if any Deephaven columns are not specifically mapped to a JSON field, an exception will be thrown. |
allowUnmapped(column) | Indicate that a specific Deephaven column is not mapped to any JSON field and should be populated with null values, even if a JSON field of the same name exists. |
allowMissingKeys(boolean) | When true , JSON records with missing keys will pass a null value to Deephaven. When false , JSON records with null keys throw an exception and stop processing. |
allowNullValues(boolean) | When true , JSON fields with null values are passed to Deephaven as null values. When false , JSON fields with null values cause an exception to be thrown. |
addColumnTo<X>Function(column, to<X>Function) | Specify that a given Deephaven column should be populated by the output of a Function that takes in a JSON record and produces a value of the specified type. Initially available are Int, Long, and Double options. See the Complex Field Mappings section for more information. |
addColumnToFunction(column, returnType, function) | Specify that a given Deephaven column should be populated by the output of a Function that takes in a JSON record and produces a value of type returnType . |
Example
builder = new JSONToTableWriterAdapter.Builder()
.allowUnmapped("Status")
.allowUnmapped("Event")
.addColumnToFieldMapping("OriginalTimestamp", "Timestamp")
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
.solaceReceiveTimestampColumnName("SolaceReceiveTimestamp")
.solaceSendTimestampColumnName("SolaceSendTimestamp")
.allowNullValues(true)
SimpleStringToTableWriterAdapter
This adapter writes unparsed input text directly into a single text-type column of the specified table. Each message produces one Deephaven record.
SimpleStringToTableWriterAdapter.Builder
The following additional method is available for this extension of Builder:
setValueColumnName(columnName)
- Use this method of the Builder to specify the name of the column where the text data will be written.
Example
builder = new SimpleStringToTableWriterAdapter.Builder()
.allowUnmapped("Status")
.allowUnmapped("Event")
.setValueColumnName("RawText")
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
.solaceReceiveTimestampColumnName("SolaceReceiveTimestamp")
.solaceSendTimestampColumnName("SolaceSendTimestamp")
.allowNullValues(true)
Complex field mappings
If your JSON requires complex mappings, you can map the JsonRecord
object to table fields using arbitrary functions. The builder provides methods for Integer, Long, Double and generic Object fields. Extending the above example, if we added a Notional column with type double and a Conditions column with type String [], we can use Groovy closures to define the mapping of Price * Size for the notional column and split the SaleCondition field into a String array:
def notionalFunction = { JsonRecord record ->
int size = JsonRecordUtil.getInt(record, "Size")
double price = JsonRecordUtil.getDouble(record, "Price")
return (size * price)
}
def conditionSplit = { JsonRecord record ->
String condition = JsonRecordUtil.getString(record, "SaleCondition")
if (condition == null) {
return null;
}
return condition.split(",")
}
builder = new JSONToTableWriterAdapter.Builder()
.allowUnmapped("Status")
.allowUnmapped("Event")
.addColumnToFieldMapping("OriginalTimestamp", "Timestamp")
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
.solaceReceiveTimestampColumnName("SolaceReceiveTimestamp")
.solaceSendTimestampColumnName("SolaceSendTimestamp")
.addColumnToDoubleFunction("Notional", notionalFunction)
.addColumnToFunction("Conditions", String[].class, conditionSplit)
.allowNullValues(true)
Your logic need not be contained in Groovy closures. Any standard java.util.function.Function
, java.util.function.ToIntFunction
, java.util.function.ToLongFunction
, or java.util.function.ToDoubleFunction
that takes a io.deephaven.kafka.ingest.JsonRecord
as an input can be used. You may for example define your function in your own Java code for simpler unit testing and more controlled deployment.
Legacy Persistent Query Example
// Deephaven imports
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer;
import com.fishlib.configuration.Configuration;
import com.fishlib.util.process.ProcessEnvironment;
import com.illumon.iris.db.util.logging.IrisLogCreator;
import com.illumon.iris.db.v2.routing.DataRoutingServiceFactory;
import io.deephaven.solace.ingester.SolaceQueueIngester;
import io.deephaven.solace.ingester.JSONToTableWriterAdapter;
import com.solacesystems.jcsmp.Consumer;
import com.solacesystems.jcsmp.*;
// Copy the host, vpn, username and password from the Deephaven
// configuration.
// You may want to hardcode these values, or alternatively can set them in
// the Deephaven configuration or by using System properties from the
// 'Extra JVM arguments' panel.
final JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.HOST, Configuration.getInstance().getProperty("SolaceQueueIngester.host"));
properties.setProperty(JCSMPProperties.USERNAME, Configuration.getInstance().getProperty("SolaceQueueIngester.username"));
properties.setProperty(JCSMPProperties.VPN_NAME, Configuration.getInstance().getProperty("SolaceQueueIngester.vpn"));
properties.setProperty(JCSMPProperties.PASSWORD, Configuration.getInstance().getProperty("SolaceQueueIngester.password"));
properties.setProperty(JCSMPProperties.GENERATE_RCV_TIMESTAMPS, true);
final JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties);
session.connect();
final EndpointProperties endpointProps = new EndpointProperties();
endpointProps.setPermission(EndpointProperties.PERMISSION_CONSUME);
endpointProps.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
// In this example we are using a “trades” queue that contains market trades
// in JSON format, with the 'SolaceQueueIngester.queueName' configuration variable set to the
// value "trades".
final com.solacesystems.jcsmp.Queue queue = JCSMPFactory.onlyInstance().createQueue(Configuration.getInstance().getProperty("SolaceQueueIngester.queueName"));
session.provision(queue, endpointProps, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
final ConsumerFlowProperties flow_prop = new ConsumerFlowProperties();
flow_prop.setEndpoint(queue);
flow_prop.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
EndpointProperties endpoint_props = new EndpointProperties();
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
// create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault();
// if the dis configuration specifies "private" storage, then you must provide a storage location, otherwise set this to null
storage = "/db/dataImportServers/SolaceImport1"
// the SolaceImport1 string is the name given to this in-worker DataImportServer configuration
disName = "SolaceImport1"
dis = DataImportServer.getDataImportServer(null, disName, Configuration.getInstance(), db.getSchemaService(), routingService, storage)
dis.startInWorker();
// By default the JSONToTableWriterAdapter maps columns to JSON fields of
// the same name. Additionally, we set the values of “Status” and “Event” to
// null, because they do not exist in our JSON. We use the “Timestamp” field
// within the JSON records to populate the “OriginalTimestamp” column. The
// MessageID column is populated from the Solace message properties [this // can be useful for debugging to trace the provenance of a row of data], and
// the IngestTimestamp is the wall-clock time of the ingestion. We allow the
// JSON records to contain null values.
builder = new JSONToTableWriterAdapter.Builder()
.allowUnmapped("Status")
.allowUnmapped("Event")
.addColumnToFieldMapping("OriginalTimestamp", "Timestamp")
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
// note that if the Solace library does not populate this field. NULL is written to Deephaven
.solaceReceiveTimestampColumnName("SolaceReceiveTimestamp")
.solaceSendTimestampColumnName("SolaceSendTimestamp")
.allowNullValues(true)
// Create a SolaceQueueIngester with an ingestername of “TradeImport”,
// with a target of the Solace.TradeNbboStock table. Each Solace message
// must have the property “MsgId” which is a unique increasing String value
// that is used to reliably acknowledge messages once they have been
// durably committed to Deephaven. The final `builder.buildFactory()`
// argument is the definition above of how to process the JSON messages.
{{ ... }}
si = new SolaceQueueIngester(log, "TradeImport", "Solace", "TradeNbboStock", currentDateNy(), "MsgId", session, queue, flow_prop, endpoint_props, dis, builder.buildFactory(log))
si.start()
// The table is now available from this and other workers
jsonTable=db.i("Solace", "TradeNbboStock").where("Date=currentDateNy()")
{{ ... }}
BytesToTableWriterAdapter
Classes implementing this interface encapsulate the transformation of binary data into Deephaven records.
PojoToTableWriterAdapter
This adapter processes binary data by deserializing it into a provided class or executing Java code that accesses the binary data. The processing flow makes an object available to the ingester, which interprets the binary data to provide data values. When automapping is enabled, reflection finds methods and fields in the valueClass that match target table column names. For example, to match a column called CustomerName, the matcher searches for a getCustomerName()
method or CustomerName
field.
When configuring the Builder, specify a valueClass
to interpret binary data from message payloads. Deephaven table columns can be mapped automatically and/or manually to values from the valueClass
instance. The builder creates and compiles a new class based on Builder properties, which is used for processing received messages.
You can create additional adapters to process properties from "top level" messages/objects. This allows embedded objects to be processed by their own adapters and written to separate tables. For example, a message might contain a parent order record with embedded line items. A second PojoToTableWriterAdapter
could write these line item properties to columns in their own table.
The following property is configurable:
PojoToTableWriterAdapter.consumerReportInterval
:
Interval in milliseconds for the adapter to report performance information on message processing counts. Default: 60000 (one minute).
PojoToTableWriterAdapter.Builder
The following additional methods are available for this extension of Builder.
| Method | Description |
| ------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --- |
| valueClass(class)
| Required. The class used to interpret binary message data and provide values for Deephaven columns. Typically a custom class that must be available on the ingester server's classpath. Add a jar containing the class and its dependencies to /etc/sysconfig/illumon.d/java_lib
on the server. |
| valueObjectCreator(expression)
| Sets the expression for instantiating the valueClass
with binary message data. Binary data is available as metadata.getBytes()
. If not set, the valueClass
is instantiated using SerializationUtils.deserialize(metadata.getBytes())
. |
| imports(String...)
| Specifies additional imports needed in the compiled adapter class. |
| caseInsensitiveSearch(boolean)
| Defaults to false
. When false
, column names and class fields must match exactly. When true
, case mismatches are allowed. For example, a column named dataValue
wouldn't match a method called getDataValue()
by default, but would match when set to true
. |
| setSequenceGenerator(SequenceGenerator)
| Specifies a SequenceGenerator
object for creating unique IDs for embedded objects. Used when embedded objects are passed to another adapter for writing to their own table, and these objects may have their own embedded children requiring unique IDs to link them to descendants. |
| addCustomMapper(PojoAdapterColumnMapper)
| Specifies a custom object to process columns and valueClass
methods/fields for complex mappings beyond simple one-to-one automapping. See custom mappers section below for details. Useful for repeated patterns like size properties and array getters that extract embedded values. Can be called multiple times to add multiple custom mappers. |
| addColumnToValueField(column, field)
| Specifies a direct one-to-one mapping between a valueClass
field and target column. |
| addColumnToValueMethod(column, method)
| Specifies a direct one-to-one mapping between a valueClass
method and target column. Don't include parentheses in the method name. |
| addColumnToSetter(column, setter)
| Specifies a direct one-to-one mapping between an expression executed in the compiled adapter and a target column. See Complex Field Mappings for details. |
| addObjectToAdapter(objectExpression, adapter, parentIdColumnName)
| Maps an expression (such as an Object method) to an adapter, allowing embedded objects in a record to be written to their own table. |
| setPrintClassBody(boolean)
| Enables printing of adapter class source code to stdout. Useful for troubleshooting Pojo adapters that fail to compile or work incorrectly. |
| autoValueMapping(boolean)
| When set to true
, any Deephaven columns that are not specifically mapped otherwise will be assumed to be mapped to a valueClass
field or method with a matching name. When false
, if any Deephaven columns are not specifically mapped to source from the valueClass
, an exception will be thrown. | |
| allowUnmapped(column)
| Indicate that a specific Deephaven column is not mapped to any field, method, or expression of the valueClass
. | |
Example
builder = new PojoToTableWriterAdapter.Builder()
.valueClass(SerializedTestClass.class)
.caseInsensitiveSearch(true)
.allowUnmapped("Status")
.allowUnmapped("Event")
.addColumnToValueField("OriginalTimestamp", "Timestamp")
.addColumnToValueMethod("OrderNo", "getOrderNumber")
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
.solaceReceiveTimestampColumnName("SolaceReceiveTimestamp")
.solaceSendTimestampColumnName("SolaceSendTimestamp")
Complex field mappings
For complex object ingestion mappings, you can map the valueClass
object to table fields using custom expressions. Within the class that handles column value assignments, the valueClass
object instance is called deephavenSolaceIngesterObjectVariable
.
For example, to set a column by converting an object attribute from long epoch milliseconds to a DBDateTime:
.addColumnToSetter("OrderDate","com.illumon.iris.db.tables.utils.DBTimeUtils.millisToTime(deephavenSolaceIngesterObjectVariable.getOrderMillis())")
Custom mapper classes
For frequently used complex mappings or when column value construction is too complex for simple expressions, use a custom mapper to incorporate advanced code generation into the adapter class creation process.
Custom mapper classes must implement the PojoAdapterColumnMapper
interface and its doMapping
method. This method provides access to table column names and valueClass
/adapter properties for dynamically creating custom transformation code and expressions.
doMapping method signature
public void doMapping(
@NotNull final Map<String, String> columnToSetter,
@NotNull final Class valueClass,
@NotNull final Map<String, Class> columnNameToType,
@NotNull final String valueObjectName,
@NotNull final List<String> columnValueExpressions
);
Parameters
Parameter | Type | Description |
---|---|---|
columnToSetter | Map<String, String> | Map of column names to String expressions passed to column setters. For example, "X" and "1+2" for an int column generates X.setInt(1+2) . |
valueClass | Class | The class providing row data. |
columnNameToType | Map<String, Class> | Map of column names to their data type Classes for the target table. |
valueObjectName | String | The String name of the valueClass object instantiated to provide row data. |
columnValueExpressions | List<String> | List of expressions executed before calling column setters. Use for complex expressions that cannot be evaluated directly in the setter (e.g., array creation and population). The corresponding columnToSetter entry would contain the variable name. |
Example: Custom Array Mapper
The following example shows a custom mapper that creates setter code to populate array columns. It searches for a method providing array length and another providing individual array elements, then uses both to create a single array value for the table.
import io.deephaven.solace.ingester.PojoAdapterColumnMapper;
import java.lang.reflect.Method;
private static class CustomArrayMapper implements PojoAdapterColumnMapper {
private boolean methodExists(@NotNull final Class<?> inputClass,
@NotNull final String methodName,
final Class<?> parameterClass) {
try {
if (parameterClass != null) {
final Method method = inputClass.getMethod(methodName, parameterClass);
} else {
final Method method = inputClass.getMethod(methodName);
}
} catch (NoSuchMethodException e) {
return false;
}
return true;
}
private String capitalizeFirstLetter(@NotNull final String input) {
if (input.length() < 2) {
return input.toUpperCase();
}
return input.substring(0,1).toUpperCase() + input.substring(1);
}
public void doMapping(
@NotNull final Map<String, String> columnToSetter,
@NotNull final Class valueClass,
@NotNull final Map<String, Class> columnNameToType,
@NotNull final String valueObjectName,
@NotNull final List<String> columnValueExpressions) {
for (final String col : columnNameToType.keySet()) {
Class<?> colType = columnNameToType.get(col);
if (colType.isArray()) {
final String lengthMethod = "get" + capitalizeFirstLetter(col) + "Length";
final String elementMethod = "get" + capitalizeFirstLetter(col) + "At";
if (methodExists(valueClass, lengthMethod, null)
&& methodExists(valueClass, elementMethod, int.class)) {
final String length = valueObjectName + "." + lengthMethod + "()";
final String element = valueObjectName + "." + elementMethod + "(";
final String retval = "deephavenSolaceRetval" + col;
final String valueExpression = "final " + colType.getSimpleName() +
" " + retval + " = new " + colType.getComponentType().getSimpleName() + "[" + length +
"];\n for (int i=0; i<" + length + "; i++) {\n " + retval + "[i]=" + element +
"i);\n }\n";
columnValueExpressions.add(valueExpression);
columnToSetter.put(col, retval);
}
}
}
}
}
Example Persistent Queries
Example 1: for messages being written to one table
// Deephaven imports
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer;
import com.fishlib.configuration.Configuration;
import com.fishlib.util.process.ProcessEnvironment;
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory;
import io.deephaven.solace.ingester.SolaceQueueIngester;
import io.deephaven.solace.ingester.PojoToTableWriterAdapter;
import com.solacesystems.jcsmp.*;
// Copy the host, vpn, username and password from the Deephaven
// configuration.
// You may want to hardcode these values, or alternatively can set them in
// the Deephaven configuration or by using System properties from the
// 'Extra JVM arguments' panel.
final JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.HOST, Configuration.getInstance().getProperty("SolaceQueueIngester.host"));
properties.setProperty(JCSMPProperties.USERNAME, Configuration.getInstance().getProperty("SolaceQueueIngester.username"));
properties.setProperty(JCSMPProperties.VPN_NAME, Configuration.getInstance().getProperty("SolaceQueueIngester.vpn"));
properties.setProperty(JCSMPProperties.PASSWORD, Configuration.getInstance().getProperty("SolaceQueueIngester.password"));
properties.setProperty(JCSMPProperties.GENERATE_RCV_TIMESTAMPS, true);
final JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties);
session.connect();
final EndpointProperties endpointProps = new EndpointProperties();
endpointProps.setPermission(EndpointProperties.PERMISSION_CONSUME);
endpointProps.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
// In this example we are using an “orders” queue that contains market orders
// in proprietary format that will be interpreted by the io.deephaven.solace.OrderEvent
// class.
final com.solacesystems.jcsmp.Queue queue = JCSMPFactory.onlyInstance().createQueue(Configuration.getInstance().getProperty("SolaceQueueIngester.queueName"));
session.provision(queue, endpointProps, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
final ConsumerFlowProperties flow_prop = new ConsumerFlowProperties();
flow_prop.setEndpoint(queue);
flow_prop.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
EndpointProperties endpoint_props = new EndpointProperties();
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
// create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault();
// the SolaceImport1 string is the name given to this in-worker
// DataImportServer within the routing YAML configuration file
disConfig = routingService.getDataImportServiceConfig("SolaceImport1");
if (disConfig == null) {
throw new IllegalArgumentException("Could not find disConfig!");
}
dis = new DataImportServer(ProcessEnvironment.getDefaultLog(), disConfig, Configuration.getInstance());
dis.startInWorker();
// By default the PojoToTableWriterAdapter maps columns to object methods or fields
// with the same name. Additionally, we set the value of “UnsuppliedColumnName” to
// null, because it does not exist in our class. The MessageID column is populated from
// the Solace message properties [this can be useful for debugging to trace the provenance
// of a row of data], and the IngestTimestamp is the wall-clock time of the ingestion.
orderAdapterFactory = new PojoToTableWriterAdapter.Builder()
.allowUnmapped("UnsuppliedColumnName")
.valueClass(OrderEvent.class)
.valueObjectCreator("new OrderEventClass(metadata.getBytes());")
.imports("io.deephaven.solace.OrderEvent.class")
.caseInsensitiveSearch(true)
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
.buildFactory(log);
// Uncomment the next line if (extremely) verbose logging is needed
//log.setLevel(com.fishlib.io.log.LogLevel.TRACE)
si = new SolaceQueueIngester(log, "OrderEventsImport", "TestNamespace", "OrderEventsTable", currentDateNy(), "MsgId", true, session, queue, flow_prop, endpoint_props, dis, orderAdapterFactory);
si.start()
// The table is now available from this and other workers
pojoTable=db.i("TestNamespace", "OrderEventsTable").where("Date=currentDateNy()")
Example 2: for messages with embedded objects being written to a second table
// A sample Persistent Query for messages being written to one table
// Deephaven imports
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer;
import com.fishlib.configuration.Configuration;
import com.fishlib.util.process.ProcessEnvironment;
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory;
import io.deephaven.solace.ingester.SolaceQueueIngester;
import io.deephaven.solace.ingester.PojoToTableWriterAdapter;
import com.solacesystems.jcsmp.*;
import io.deephaven.solace.ingester.BytesXMLMessageToTableWriterAdapter;
import io.deephaven.solace.ingester.EmbeddedObjectAdapter;
// Copy the host, vpn, username and password from the Deephaven
// configuration.
// You may want to hardcode these values, or alternatively can set them in
// the Deephaven configuration or by using System properties from the
// 'Extra JVM arguments' panel.
final JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.HOST, Configuration.getInstance().getProperty("SolaceQueueIngester.host"));
properties.setProperty(JCSMPProperties.USERNAME, Configuration.getInstance().getProperty("SolaceQueueIngester.username"));
properties.setProperty(JCSMPProperties.VPN_NAME, Configuration.getInstance().getProperty("SolaceQueueIngester.vpn"));
properties.setProperty(JCSMPProperties.PASSWORD, Configuration.getInstance().getProperty("SolaceQueueIngester.password"));
properties.setProperty(JCSMPProperties.GENERATE_RCV_TIMESTAMPS, true);
final JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties);
session.connect();
final EndpointProperties endpointProps = new EndpointProperties();
endpointProps.setPermission(EndpointProperties.PERMISSION_CONSUME);
endpointProps.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
// In this example we are using an “orders” queue that contains market orders
// in proprietary format that will be interpreted by the io.deephaven.solace.OrderEvent
// class.
final com.solacesystems.jcsmp.Queue queue = JCSMPFactory.onlyInstance().createQueue(Configuration.getInstance().getProperty("SolaceQueueIngester.queueName"));
session.provision(queue, endpointProps, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
final ConsumerFlowProperties flow_prop = new ConsumerFlowProperties();
flow_prop.setEndpoint(queue);
flow_prop.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
EndpointProperties endpoint_props = new EndpointProperties();
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
// create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault();
// the SolaceImport1 string is the name given to this in-worker
// DataImportServer within the routing YAML configuration file
disConfig = routingService.getDataImportServiceConfig("SolaceImport1");
if (disConfig == null) {
throw new IllegalArgumentException("Could not find disConfig!");
}
dis = new DataImportServer(ProcessEnvironment.getDefaultLog(), disConfig, Configuration.getInstance());
dis.startInWorker();
// Adapters that will write embedded objects to other tables must be created
// before the adapter for the top-level messages, because these sub-adapters
// have to be available when building the main adapter.
orderDetailAdapterFactory = new PojoToTableWriterAdapter.Builder()
.imports("io.deephaven.solace.OrderDetail")
.valueClass(OrderDetail.class)
// The parent adapter retrieves a generic object from the parent adapter
// and it must be cast into its valueClass in the valueObjectCreator of
// this adapter.
.valueObjectCreator("(OrderDetail)(metadata.getValueObject());")
.addColumnToSetter("orderEventMessageId","metadata.getParentIdValue()")
.caseInsensitiveSearch(true)
.buildFactory(log);
// The sub-adapter itself is created using new EmbeddedObjectAdapter; for the
// top-level adapter, the adapter is created internally when creating the ingester,
// but, for sub-adapters, an explicit call is required to set up the data processor
// that will manage the table's files.
final BytesXMLMessageToTableWriterAdapter orderDetailAdapter = new EmbeddedObjectAdapter(
"TestNamespace",
"OrderDetailsTable",
"NestedOrderDetailIngester",
DBTimeUtils.currentDateNy(),
orderDetailAdapterFactory,
dis,
log).getEmbeddedObjectAdapter();
// By default the PojoToTableWriterAdapter maps columns to object methods or fields
// with the same name. Additionally, we set the value of “UnsuppliedColumnName” to
// null, because it does not exist in our class. The MessageID column is populated from
// the Solace message properties [this can be useful for debugging to trace the provenance
// of a row of data], and the IngestTimestamp is the wall-clock time of the ingestion.
orderAdapterFactory = new PojoToTableWriterAdapter.Builder()
.allowUnmapped("UnsuppliedColumnName")
.valueClass(OrderEvent.class)
.valueObjectCreator("new OrderEventClass(metadata.getBytes());")
.imports("io.deephaven.solace.OrderEvent.class")
.caseInsensitiveSearch(true)
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
.addObjectToAdapter("getOrderDetail()",orderDetailAdapter,"MessageId")
.buildFactory(log);
// Uncomment the next line if (extremely) verbose logging is needed
//log.setLevel(com.fishlib.io.log.LogLevel.TRACE)
si = new SolaceQueueIngester(log, "OrderEventsImport", "TestNamespace", "OrderEventsTable", currentDateNy(), "MsgId", true, session, queue, flow_prop, endpoint_props, dis, orderAdapterFactory);
// One additional step that is needed with hierarchical ingesters is to add sub-adapters
// to the ingester. This is so state of checkpointed data can be tracked across child tables.
si.addAdapter(orderDetailAdapter);
si.start()
// The tables are now available from this and other workers
orderEventTable=db.i("TestNamespace", "OrderEventsTable").where("Date=currentDateNy()")
orderDetailTable=db.i("TestNamespace", "OrderDetailsTable").where("Date=currentDateNy()")
Deephaven-to-Solace Adapters
TablePublisher
The TablePublisher
is a convenience class that encapsulates the logic for passing Deephaven table updates to external services. As an InstrumentedShiftAwareListenerAdapter
, TablePublisher
knows how to connect to tables and respond to updates, ignoring internal movements that don't represent actual data changes (shifts).
After constructing your TablePublisher
object, call <Table>.listenForUpdates(<TablePublisher>)
to start listening to the table.
Method | Argument | Description |
---|---|---|
TablePublisher(table, addAdapter, removeAdapter, addConsumer, removeConsumer, keyModifiedColumns, ignoreModified) | Creates a new TablePublisher object that listens to the specified DynamicTable for updates. | |
Table | The DynamicTable to listen to. | |
addAdapter | Optional. A TableToPublishableObjectAdapter that produces Lists of messages for add operations. | |
removeAdapter | Optional. A TableToPublishableObjectAdapter that produces Lists of messages for remove operations. | |
addConsumer | Optional. A Consumer<List<?>> that processes the addAdapter's output and delivers data to its destination. | |
removeConsumer | Optional. A Consumer<List<?>> that processes the removeAdapter's output and delivers data to its destination. | |
keyModifiedColumns | Optional. A List of Strings for which columns make up the 'key' for this table as far as updates are concerned. If a row is updated but the key columns do not change, only the 'add' portion of the update will be published. If the key columns do change, both the 'remove' and 'add' portions of the update will be published. If this is left null, all columns are considered 'key'. If this is passed in as an empty List, no columns are considered 'key' and so 'remove' operations will never be published as part of a 'modify' operation. | |
TablePublisher(table, addAdapter, removeAdapter, consumer) | Convenience constructor that publishes both Add and Remove operations to the same destination. | |
TablePublisher(table, addAdapter, consumer) | Convenience constructor that ignores Remove operations and publishes Adds to a specified destination. |
TableToPublishableObjectAdapter
The TableToPublishableObjectAdapter
(and implementing classes) encapsulates Deephaven update operations and transforms them into formats deliverable to external services, including Solace.
Since Solace message properties vary by situation, this plugin generates content suitable for Solace message delivery rather than the messages themselves.
TableToPublishableObjectAdapter
works within InstrumentedListenerAdapter
implementations, handling onUpdate
calls. Each table update gets processed into deliverable messages.
TableToPublishableObjectAdapter.Builder
This class builds TableToPublishableObjectAdapter
instances, similar to BytesXMLMessageToStringAdapter.Builder
.
Shared Builder Methods
Method | Description |
---|---|
autoValueMapping(boolean) | When true , all unmapped base table columns are output as individual result fields. When false , only explicitly-mapped columns are included in the adapter output. |
ignoreMissingColumns(boolean) | Has no effect when autoValueMapping is enabled. When autoValueMapping is disabled and this is true , only explicitly-mapped columns are included and unmapped columns are ignored. When autoValueMapping is disabled and this is false , a SolacePublisherException is thrown during adapter creation if any source table columns are unmapped. |
excludeColumn(string) | Excludes the named column from adapter output. Cannot exclude columns that are already excluded or explicitly mapped. |
mapColumn(string) | Includes the named column in adapter output using the same field name. Cannot remap already-mapped columns unless using the two-parameter version to specify a different field name. |
mapColumn(string, string) | Maps the column (first parameter) to an output field with the specified name (second parameter). Cannot remap already-mapped fields, but can map one input column to multiple output fields. |
addCustomFunction(string, Function) | Creates an output field with values from the provided function. Field names cannot be reused. The function must ensure output matches the expected format. |
timestampFieldName(string) | Creates an output field with the message generation timestamp. |
TableToJSONAdapter
This TableToPublishableObjectAdapter
variant produces JSON output - one JSON object string per row in the Index. When using addCustomFunction
, the function must provide complete JSON text. For complex objects, ensure the entire object is well-formed. Field names are automatically generated and shouldn't be included in custom functions.
TableToJSONAdapter.Builder
Method | Description |
---|---|
buildFactory() | Creates a Factory object that can create TableToJSONAdapters . |