Solace integration plugin
The Solace Integration Plugin is an optional component of the Deephaven system allowing for interaction with the Solace™ PubSub+ Mesh. The classes in this plugin do not run anything on their own; instead, they may be used within a Persistent Query. Rather than encapsulate the Solace libraries, these classes take in Solace objects that may be configured via Solace's own standard mechanisms.
Setup
Plugin installation
You may install the SolaceIngester plugin either via RPM or via tar installation. Either method may require sudo access.
RPM:
yum install solace-plugin-*
sudo /etc/sysconfig/deephaven/plugins/solace-plugin/bin/activate.sh --classpath
Note: solace-plugin-*
is the plugin RPM file.
Tar:
- Copy the .tgz file to
/tmp/solace-plugin/
rm -rf /etc/sysconfig/deephaven/plugins/solace-plugin
mkdir -p /etc/sysconfig/deephaven/plugins/solace-plugin
tar -xvf /tmp/solace-plugin/Solace-plugin-1.20200331.150_1.0-Manual.tgz -C /etc/sysconfig/deephaven/plugins/solace-plugin
/etc/sysconfig/deephaven/plugins/solace-plugin/bin/reinstall.sh
Caution
An important note: As of this writing, a Solace Queue will not permit more than a fixed number of non-acknowledged messages to be delivered to a consumer at a time. The Solace Integration Plugin will consume messages as fast as it can from the Queue, and acknowledges those messages once they have been checkpointed (written to disk). The default checkpoint timeout is 30000 ms (30 seconds). You will likely want to set the value of the configuration property DataImportStreamProcessor.<your namespace>.<your table>.checkpointIntervalMillis
to a smaller value, such as 5000 ms.
Solace libraries installation
Note
This section references packages provided by Solace, and may be subject to change if Solace changes how they deliver or structure their API packages.
The SolaceQueueIngester relies on various classes from Solace. These classes are contained in the Java™ JCSMP libraries published by Solace, presently available at the Solace download page.
As of this writing, the jar files are provided in a zip file from that location. Copy the zip file to the server where you will run the SolaceQueueIngester, then open the zip file. Within this zip file is a 'lib' directory. Copy all of the .jar files from within the lib directory to: /etc/sysconfig/deephaven/illumon.d.latest/java_lib
SolaceIngesters
SolaceIngesters are classes available from the SolaceIngester plugin. Clients who utilize the Solace™ PubSub+ Mesh can use these classes to import data directly into a Deephaven instance from Solace. The messages being delivered by Solace must be in a form that can be converted into Deephaven data rows. The SolaceIngester plugin includes pre-built adapters for raw text and for JSON, but other adapters can be created as needed.
SolaceQueueIngester
The SolaceQueueIngester is a SolaceIngester that can connect a Solace Queue directly to Deephaven. This is useful in cases where guaranteed delivery is important, or where high-availability is required, as using Queues can ensure that multiple import servers receive identical data.
In order to use the SolaceQueueIngester, a Solace Queue must have already been created, a Deephaven table for data to populate from Solace must exist within Deephaven, and some mechanism must exist by which messages are published from some data source to the Solace Queue. In addition, all messages must have a unique, monotonically increasing ID value included in their message header properties; the name of the property to use is configurable, but must be consistent across all messages.
The SolaceQueueIngester takes in references to a Solace Queue, a Deephaven Data Import Server, and an adapter to convert messages from Solace into rows of Deephaven data. Under standard operation, a SolaceQueueIngester will run inside a Deephaven Persistent Query.
SolaceTopicIngester
The SolaceTopicIngester is a SolaceIngester that can connect a Solace Topic directly to Deephaven. This is useful in cases where only as-available data is important, or where data is not retained for replay later, and it is important only to accurately record the data that was received while the system was running, not to receive the entire universe of data.
In order to use the SolaceTopicIngester, a Solace Topic must be accessible (either a persistent Topic or a temporary Topic), a Deephaven table for data to populate from Solace must exist within Deephaven, and some mechanism must exist by which messages are published from some data source to the Solace Topic. Unlike a SolaceQueueIngester, no unique message ID is required; messages will be handled by the SolaceTopicIngester in the order they are received. Note that this means that replication is not possible with a SolaceTopicIngester; two separate SolaceTopicIngester instances might receive different messages if they do not start at the same instance.
The SolaceTopicIngester takes in references to a Solace Topic, a Deephaven Data Import Server, and an adapter to convert messages from Solace into rows of Deephaven data. Under standard operation, a SolaceTopicIngester will run inside a Deephaven Persistent Query.
Data Import Server Configuration
SolaceIngesters write directly to a Deephaven Data Import Server (DIS), which must be configured to write data out appropriately.
Directory setup
Each instance of a DIS must have exclusive control over the directories that it is writing to. If the data routing is configured such that two Data Import Servers will have a disjoint set of partitions, they can share the storage. However, for simplicity, Deephaven recommends that each DIS has a unique directory using a path such as: /db/dataImportServers]/[DIS_Name]
For example, to create a directory for SolaceImport1:
sudo mkdir -p /db/dataImportServers/SolaceImport1
sudo chown dbquery.dbmergegrp /db/dataImportServers/SolaceImport1
The storage directory must exist before a Solace ingestion DIS is started. When the script is running in a regular query or console, the dbquery user on the server will need to have read and write access to this path. When the script is run as a persistent query (see example below) against a merge server (preferred for production use), the dbmerge account will need read and write privileges.
Routing setup
The DIS uses a YAML file to determine how to find data sources. Please see Routing for Deephaven Ingesters for information on how to configure routing for a SolaceQueueIngester.
SolaceIngester configurable properties
Property | Description |
---|---|
SolaceIngester.reportIntervalMs | How long, in milliseconds, between logged reports of how many messages were processed in this timeframe. Default 60000 (one minute). |
SolaceIngester.cleanupIntervalMillis | How long, in milliseconds, between when the adapter should perform any needed cleanup (such as delayed message acknowledgements, etc). This ensures that in low-throughput situations, messages are routinely cleaned up without long waits. Default 100. |
SolaceIngester.cleanupMsgInterval | How long, in number of messages received, between when the adapter should perform any needed cleanup (such as delayed acks, etc). This ensures that in high-throughput situations, messages awaiting cleanup do not accumulate in memory. Default 10000. |
SolaceIngester.queueTimeoutMs | How long, in milliseconds, the ingester should wait before determining that there are no messages currently waiting to be processed, and perform cleanup on any pending messages immediately. Default 50. |
SolaceIngester.<ingestername>.internalPartition | What internal partition this ingester should write to. Each individual ingester (including Kafka ingesters) should use a different internal partition. If no value is provided, this will default to using the hostname of the system where the ingester is running. No default. |
Related Configuration Properties
Property | Description |
---|---|
DataImportStreamProcessor.[namespace].[table].checkpointIntervalMillis | This property can have a significant impact on the performance of a SolaceQueueIngester. A Solace Queue or Topic will only deliver up to a certain number of messages to a receiver that have not been acknowledged (at the time of this writing, this has a maximum of 1,000,000 delivered unacknowledged messages). The SolaceTopicIngester automatically acknowledges all inbound messages once they've been read, as redelivery is never expected in this case. However, the SolaceQueueIngester acknowledges messages when they have been checkpointed (i.e., written recoverably to disk). When using the SolaceQueueIngester, you must set your checkpoint interval to an interval such that (desired messages per second) * (checkpoint interval) < (maximum unacknowledged messages) . Deephaven does not recommend setting this value below 5000 in most cases. |
DataImportStreamProcessor.[Namespace].[Table].flushIntervalMillis | Similarly, this property of the DIS controls how often data will be pushed out to Deephaven clients. If this value is set to 1000, clients will see their screen update each second. Even if the ingester has processed an update in one microsecond, the user may not see that data for a full second. In most configurations, this is a reasonable balance between latency and the speed of human perception. If a lower latency is required, you may wish to tune this value for the specific table being imported into. At some point (variable dependent on your configuration), setting this value too low may consume enough CPU time to block other import operations. In most cases, the default value for this property will be reasonable. |
Solace-to-Deephaven Adapters
The SolaceIngester is agnostic to the format of the Solace data. An Adapter is used to convert a String representation of the Solace message into zero or more Deephaven rows. A SolaceIngester requires a BytesXMLMessageToTableWriterAdapter, which is a class that can take Solace BytesXMLMessage objects and persist them into Deephaven tables. This class is included in the SolaceIngester plugin.
BytesXMLMessageToTableWriterAdapter
Classes implementing this interface can take Solace's BytesXMLMessage objects and extract the text payload from that object, plus supplemental information such as the unique message header ID, and store that information. These interfaces are to be created via Builder objects, which will provide the definitions for the actual Adapters. The various public methods of each Builder object will return the original Builder object with the specified configuration, so they may be chained.
The actual BytesXMLMessageToStringAdapter.Builder
class should not be directly called; instead, call the Builder classes of the underlying StringToTableWriterAdapter implementations.
Shared Builder methods
Method | Description |
---|---|
messageIdColumnName(column) | If set, the specified Deephaven column will be populated with the unique message ID specified in the message header properties. If not set, the unique message ID will not be persisted. |
timestampColumnName(column) | If set, the specified Deephaven column will be populated with a timestamp indicating the wall-clock time when the message was consumed by the adapter. If not set, no timestamp will be persisted. |
solaceSendTimestampColumnName(column) | If set, the specified Deephaven column will be populated with the timestamp included in the BytesXMLMessage as the time when the message was sent, or null if the sender did not include a timestamp. If not set, sender timestamp information will not be persisted. |
solaceReceiveTimstampColumnName(column) | If set, the specified Deephaven column will be populated with the timestamp included in the BytesXMLMessage as the time when the message was received, or null if this was not enabled for this message. If not set, receive timestamp information will not be persisted. |
StringToTableWriterAdapter
Classes implementing this interface encapsulate the transformation of string data into Deephaven records.
JSONToTableWriterAdapter
This adapter will process the text as JSON, converting one JSON entry into one Deephaven record, with column definitions and translations defined at runtime.
The following properties are configurable.
Property | Description |
---|---|
JSONToTableWriterAdapter.consumerThreads - The number of threads the JSON adapter will use to process messages. Because parsing and retrieving values from JSON is slower than Deephaven can write to disk, the JSON adapter uses threading to parse multiple messages simultaneously and have them ready to write to disk. This is most notable when parsing large JSON objects (more than 50 fields per JSON object). Avoid having more threads than available cores or performance may degrade. In low-volume scenarios (under 1000 messages / second), one thread should be sufficient. Adding more threads than needed may decrease performance. Default 1 (single-threaded). | |
JSONToTableWriterAdapter.consumerWaitInterval | How long (in milliseconds) each adapter thread should wait to get a new JSON message when that thread is ready to receive messages. Default 100. |
JSONToTableWriterAdapter.consumerReportInterval | How long (in milliseconds) the JSON adapter should wait to report performance information on how many messages it processed over this timeframe. Default 60000 (one minute). |
JSONToTableWriterAdapter.Builder
The following additional methods are available for this extension of Builder.
Method | Description |
---|---|
addColumnToFieldMapping(column, field) | Use this method to specify a direct one-to-one mapping between a JSON field and a Deephaven column. |
autoValueMapping(boolean) | When set to true , any Deephaven columns that are not specifically mapped otherwise will be assumed to be mapped to a JSON field of the same name. When false , if any Deephaven columns are not specifically mapped to a JSON field, an exception will be thrown. |
allowUnmapped(column) | Indicate that a specific Deephaven column is not mapped to any JSON field and should be populated with null values, even if a JSON field of the same name exists. |
allowMissingKeys(boolean) | When true , JSON records with missing keys will pass a null value to Deephaven. When false , JSON records with null keys throw an exception and stop processing. |
allowNullValues(boolean) | When true , JSON fields with null values are passed to Deephaven as null values. When false , JSON fields with null values cause an exception to be thrown. |
addColumnTo<X>Function(column, to<X>Function) | Specify that a given Deephaven column should be populated by the output of a Function that takes in a JSON record and produces a value of the specified type. Initially available are Int, Long, and Double options. See the Complex Field Mappings section for more information. |
addColumnToFunction(column, returnType, function) | Specify that a given Deephaven column should be populated by the output of a Function that takes in a JSON record and produces a value of type returnType . |
Example
builder = new JSONToTableWriterAdapter.Builder()
.allowUnmapped("Status")
.allowUnmapped("Event")
.addColumnToFieldMapping("OriginalTimestamp", "Timestamp")
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
.solaceReceiveTimestampColumnName("SolaceReceiveTimestamp")
.solaceSendTimestampColumnName("SolaceSendTimestamp")
.allowNullValues(true)
SimpleStringToTableWriterAdapter
This adapter will write the input text, unparsed, directly into a single text-type column of the specified table. Each Message will produce one record in Deephaven.
SimpleStringToTableWriterAdapter.Builder
The following additional method is available for this extension of Builder:
setValueColumnName(columnName)
- Use this method of the Builder to specify the name of the column where the text data will be written.
Example
builder = new SimpleStringToTableWriterAdapter.Builder()
.allowUnmapped("Status")
.allowUnmapped("Event")
.setValueColumnName("RawText")
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
.solaceReceiveTimestampColumnName("SolaceReceiveTimestamp")
.solaceSendTimestampColumnName("SolaceSendTimestamp")
.allowNullValues(true)
Complex Field Mappings
If your JSON requires complex mappings, you can map the JsonRecord object to Table fields using arbitrary functions. The builder provides methods for Integer, Long, Double and generic Object fields. Extending the above example, if we added a Notional column with type double and a Conditions column with type String [], we can use Groovy closures to define the mapping of Price * Size for the notional column and split the SaleCondition field into a String array:
def notionalFunction = { JsonRecord record ->
int size = JsonRecordUtil.getInt(record, "Size")
double price = JsonRecordUtil.getDouble(record, "Price")
return (size * price)
}
def conditionSplit = { JsonRecord record ->
String condition = JsonRecordUtil.getString(record, "SaleCondition")
if (condition == null) {
return null;
}
return condition.split(",")
}
builder = new JSONToTableWriterAdapter.Builder()
.allowUnmapped("Status")
.allowUnmapped("Event")
.addColumnToFieldMapping("OriginalTimestamp", "Timestamp")
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
.solaceReceiveTimestampColumnName("SolaceReceiveTimestamp")
.solaceSendTimestampColumnName("SolaceSendTimestamp")
.addColumnToDoubleFunction("Notional", notionalFunction)
.addColumnToFunction("Conditions", String[].class, conditionSplit)
.allowNullValues(true)
Your logic need not be contained in Groovy closures. Any standard java.util.function.Function
, java.util.function.ToIntFunction
, java.util.function.ToLongFunction
, or java.util.function.ToDoubleFunction
that takes a io.deephaven.kafka.ingest.JsonRecord
as an input can be used. You may for example define your function in your own Java code for simpler unit testing and more controlled deployment.
Example Persistent Query
// Deephaven imports
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer;
import com.fishlib.configuration.Configuration;
import com.fishlib.util.process.ProcessEnvironment;
import com.illumon.iris.db.v2.configuration.DataRoutingServiceFactory
import io.deephaven.solace.ingester.SolaceQueueIngester;
import io.deephaven.solace.ingester.JSONToTableWriterAdapter;
import com.solacesystems.jcsmp.Consumer;
import com.solacesystems.jcsmp.*;
// Copy the host, vpn, username and password from the Deephaven
// configuration.
// You may want to hardcode these values, or alternatively can set them in
// the Deephaven configuration or by using System properties from the
// 'Extra JVM arguments' panel.
final JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.HOST, Configuration.getInstance().getProperty("SolaceQueueIngester.host"))
properties.setProperty(JCSMPProperties.USERNAME, Configuration.getInstance().getProperty("SolaceQueueIngester.username"));
properties.setProperty(JCSMPProperties.VPN_NAME, Configuration.getInstance().getProperty("SolaceQueueIngester.vpn"));
properties.setProperty(JCSMPProperties.PASSWORD, Configuration.getInstance().getProperty("SolaceQueueIngester.password"));
properties.setProperty(JCSMPProperties.GENERATE_RCV_TIMESTAMPS, true);
final JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties);
session.connect();
final EndpointProperties endpointProps = new EndpointProperties();
endpointProps.setPermission(EndpointProperties.PERMISSION_CONSUME);
endpointProps.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
// In this example we are using a “trades” queue that contains market trades
// in JSON format, with the 'SolaceQueueIngester.queueName' configuration variable set to the
// value "trades".
final com.solacesystems.jcsmp.Queue queue = JCSMPFactory.onlyInstance().createQueue(Configuration.getInstance().getProperty("SolaceQueueIngester.queueName"));
session.provision(queue, endpointProps, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
final ConsumerFlowProperties flow_prop = new ConsumerFlowProperties();
flow_prop.setEndpoint(queue);
flow_prop.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
EndpointProperties endpoint_props = new EndpointProperties();
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
// create the DataImportServer and its prerequisites
routingService = DataRoutingServiceFactory.getDefault();
// the SolaceImport1 string is the name given to this in-worker
// DataImportServer within the routing YAML configuration file
disConfig = routingService.getDataImportServiceConfig("SolaceImport1");
if (disConfig == null) {
throw new IllegalArgumentException("Could not find disConfig!")
}
dis = DataImportServer.getDataImportServer(ProcessEnvironment.getDefaultLog(), disConfig, Configuration.getInstance());
dis.startInWorker();
// By default the JSONToTableWriterAdapter maps columns to JSON fields of
// the same name. Additionally, we set the values of “Status” and “Event” to
// null, because they do not exist in our JSON. We use the “Timestamp” field
// within the JSON records to populate the “OriginalTimestamp” column. The
// MessageID column is populated from the Solace message properties [this // can be useful for debugging to trace the provenance of a row of data], and
// the IngestTimestamp is the wall-clock time of the ingestion. We allow the
// JSON records to contain null values.
builder = new JSONToTableWriterAdapter.Builder()
.allowUnmapped("Status")
.allowUnmapped("Event")
.addColumnToFieldMapping("OriginalTimestamp", "Timestamp")
.messageIdColumnName("MessageId")
.timestampColumnName("IngestTimestamp")
// note that if the Solace library does not populate this field. NULL is written to Deephaven
.solaceReceiveTimestampColumnName("SolaceReceiveTimestamp")
.solaceSendTimestampColumnName("SolaceSendTimestamp")
.allowNullValues(true)
// Create a SolaceQueueIngester with an ingestername of “TradeImport”,
// with a target of the Solace.TradeNbboStock table. Each Solace message
// must have the property “MsgId” which is a unique increasing String value
// that is used to reliably acknowledge messages once they have been
// durably committed to Deephaven. The final builder.buildFactory()
// argument is the definition above of how to process the JSON messages.
si = new SolaceQueueIngester(log, "TradeImport", "Solace", "TradeNbboStock", currentDateNy(), "MsgId", session, queue, flow_prop, endpoint_props, dis, builder.buildFactory(log))
si.start()
// The table is now available from this and other workers
jsonTable=db.i("Solace", "TradeNbboStock").where("Date=currentDateNy()")
Deephaven-to-Solace Adapters
TablePublisher
The TablePublisher is a convenience class that encapsulates away the logic required to pass a Deephaven table's updates to an external service. This class is an InstrumentedShiftAwareListenerAdapter, meaning that the TablePublisher inherently knows how to connect to a table and respond to updates, and will not produce updates for internal movements that do not represent changes in an individual datum (shifts).
Once you have constructed your TablePublisher object, you should call <Table>.listenForUpdates(<TablePublisher>)
to make the Publisher start listening to the Table.
Method | Argument | Description |
---|---|---|
TablePublisher(table, addAdapter, removeAdapter, addConsumer, removeConsumer, keyModifiedColumns, ignoreModified) | This creates a new TablePublisher object, listening to the specified DynamicTable for updates. | |
Table | the DynamicTable you want to listen to. | |
addAdapter | (optional) A TableToPublishableObjectAdapter producing Lists of messages of the desired type for whatever will be listening to adds. | |
removeAdapter | (optional) A TableToPublishableObjectAdapter producing Lists of messages of the desired type for whatever will be listening to removes. | |
addConsumer | (optional) A Consumer<List<?>> that takes in whatever type of List the addAdapter produces and delivers that data to its final destination. | |
removeConsumer | (optional) A Consumer<List<?>> that takes in whatever type of List the removeAdapter produces and delivers that data to its final destination. | |
keyModifiedColumns | (optional) A List of Strings for which columns make up the 'key' for this table as far as updates are concerned. If a row is updated but the key columns do not change, only the 'add' portion of the update will be published. If the key columns do change, both the 'remove' and 'add' portions of the update will be published. If this is left null, all columns are considered 'key'. If this is passed in as an empty List, no columns are considered 'key' and so 'remove' operations will never be published as part of a 'modify' operation. | |
TablePublisher(table, addAdapter, removeAdapter, consumer) | A convenience constructor that publishes both Add and Remove operations to the same destination. | |
TablePublisher(table, addAdapter, consumer) | A convenience constructor that Removes operations and publishes Adds to a specified destination. |
TableToPublishableObjectAdapter
The TableToPublishableObjectAdapter (and implementing classes) is a utility for encapsulating Deephaven update operations and transforming them into a format that can be delivered to external services, including Solace.
Because Solace message properties may vary in different situations, this plugin does not itself generate Solace messages, only content suitable for delivery via Solace messages.
The TableToPublishableObjectAdapter is specifically designed to work within an InstrumentedListenerAdapter implementation, handling the onUpdate call. That way, each time a Deephaven table updates, the update can be processed into deliverable messages.
TableToPublishableObjectAdapter.Builder
This class is the mechanism for building a TableToPublishableObjectAdapter, similar to a BytesXMLMessageToStringAdapter.Builder.
Shared Builder Methods
Method | Description |
---|---|
autoValueMapping(boolean) | when true , all columns in the base table that are not specifically mapped will be output as individual fields in the result. When false , only explicitly-mapped columns will be included in the output from this adapter. |
ignoreMissingColumns(boolean) | when autoValueMapping is enabled, this property has no effect. When autoValueMapping is disabled and this property is true , only explicitly-mapped columns will be included in the output from this adapter, and any unmapped columns will be ignored. When autoValueMapping is disabled and this property is false , a SolacePublisherException will be generated when the adapter is created if there are any columns in the source table that are not mapped. |
excludeColumn(string) | Data from the named column from your Deephaven table will not be included in the output from this adapter. You may not exclude a given column if it is already excluded or explicitly mapped. |
mapColumn(string) | Data from the named column from your Deephaven table will be included in the output from this adapter, in a field with the same name as the column. You may not map a column that has already been mapped, unless you use the two-parameter version of this method to specify a different field name for the second mapping. |
mapColumn(string, string) | Data from the column from your Deephaven table named in the first parameter will be included in the output from this adapter, in a field with the name specified in the second parameter. You may not map a field that has already been mapped, but you may map an input column to multiple output fields. |
addCustomFunction(string, Function) | Create a field in the output from this adapter with its value populated by the provided function. You may not re-use field names. It is up to the implementing function to ensure that the output matches the expected form. |
timestampFieldName(string) | Create a field in the output from this adapter with the value set to the time when this message was generated. |
TableToJSONAdapter
This variation of TableToPublishableObjectAdapter explicitly produces JSON output, one String representation of a JSON object for each row included in the Index. When using addCustomFunction with this adapter, the function must provide the entire JSON text to be output; if that is a complex object, the entire object must be well-formed. The field name is automatically produced and should not be included in the custom function.
TableToJSONAdapter.Builder
Method | Description |
---|---|
buildFactory() | When invoked, this will create a Factory object that can createTableToJSONAdapters. |