Data Routing for Deephaven Ingesters
The Deephaven engine handles a large amount of data, but not all data is handled the same way. Data may be read directly from locations on a shared filesystem, or provided by services using the Table Data Protocol. This data routing configuration is managed by the configuration server.
Deephaven is able to ingest data directly from outside sources, such as Kafka and Solace. When configuring these ingesters, the data routing configuration must be updated so that consumers can find the data.
How to configure Deephaven ingesters
Configure data routing for the new in-worker Data Import Server
Create a new data import server, claiming the tables that will be handled by this DIS, and using a dynamic endpoint. See instructions in Add a Data Import Server.
Write a query to run the in-worker Data Import Server
The in-worker DIS for the desired tables runs in a Persistent Query. The following script creates a DIS with the name "Ingester1". The DIS configuration uses "private" storage, so the second parameter to the routingService.getDataImportServiceConfigWithStorage
method specifies a pathname to a directory for the exclusive use of this in-worker DIS. The DIS registers dynamic Table Data Service and Tailer ports with the service registry.
Note
This example describes configuring a Data Import Service script using a Legacy worker.
import com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer
import com.illumon.iris.db.tables.dataimport.importstate.lastby.LastByTableMapFactory
import com.fishlib.configuration.Configuration
import com.fishlib.util.process.ProcessEnvironment
import java.nio.file.Paths
import com.illumon.util.jettyhelper.JettyServerHelper
import com.illumon.iris.console.utils.LiveTableMapSelector
import com.illumon.iris.db.v2.routing.DataRoutingService
import com.illumon.iris.db.v2.routing.DataRoutingServiceFactory
import com.illumon.iris.db.schema.SchemaServiceFactory
routingService = DataRoutingServiceFactory.getDefault()
// if the dis configuration specifies "private" storage, then you must provide a storage root, otherwise set this to null
storage = "/db/dataImportServers/Ingester1"
disName = "Ingester1"
dis = DataImportServer.getDataImportServer(
null,
disName,
Configuration.getInstance(),
SchemaServiceFactory.getDefault(), // or db.getSchemaService()
routingService,
storage)
dis.startInWorker()
Create Schemas
Each table you import must have a schema defined. You can create a Schema using the Schema Editor from the Deephaven Classic client, or using the command line . You need to create a schema with suitable data types for the records in your topic.
<Table name="Test" namespace="Kafka" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
<Partitions keyFormula="${autobalance_by_first_grouping_column}" />
<Column name="Date" dataType="String" columnType="Partitioning" />
<Column name="KafkaPartition" dataType="Int" />
<Column name="Offset" dataType="Long" />
<Column name="Timestamp" dataType="DateTime" />
<Column name="Key" dataType="String" columnType="Grouping" />
<Column name="Value" dataType="String" />
</Table>
Restart your Deephaven system
Any changes to your routing will require multiple processes to be restarted, as these pick up routing configuration at startup; they do not discover routing changes while running. At a minimum, the DIS, LTDS, TDCP, and persistent queries need to be restarted. You can restart these individually, or simply use monit restart all
to restart your entire system. This is obviously a disruptive operation, and should only be undertaken during a maintenance period when the system is expected to be potentially unavailable at some point.