Data Routing for Deephaven Ingesters

The Deephaven engine handles a large amount of data, but not all data is handled the same way. Intraday streaming data is stored in a different location on disk than historical, fixed data. The Deephaven engine needs to know how to find all of its internal data. To do so, it uses a YAML file, typically named routing_service.yml. Each Deephaven installation may customize the contents of this file to set up different data to be stored and processed in different ways.

Deephaven is also able to ingest data directly from outside sources, such as Kafka and Solace. When using these ingesters, additional routing information must be configured. All of this configuration takes place within routing_service.yml.

Note

See: For a broader overview of how Deephaven handles data routing in general, please see Data Routing Configuration via YML.

How to configure Deephaven ingesters

Configure a storage location

Add the location where the ingester DIS process will store its intraday data. In this section, "Ingester1" is an arbitrary identifier, which will be referenced later in the configuration file. The dbRoot "/db/dataImportServers/Ingester1" is also arbitrary and not directly related to the identifier.

storage:
  - name: default
    dbRoot: /db
  - name: Ingester1
    dbRoot: /db/dataImportServers/Ingester1

Configure the new in-worker Data Import Server

Create a new entry in the "dataImportServers" section. Create filters so that only the desired table or tables are accepted. You must assign appropriate values for host, storage, definitionsStorage, and tableDataPort. In this example, we have configured the Ingester1 DIS to accept system tables in the “IngesterNamespace” namespace.

dataImportServers
...
Ingester1:
  host: *ddl_query1
  tailerPort: -1
  # Handle Ingester tables only
  filters: {whereTableKey: "NamespaceSet = `System` && Namespace == `IngesterNamespace`"}
  webServerParameters:
    enabled: false
  storage: Ingester1
  definitionsStorage: default
  tableDataPort: 22223

Adjust existing Data Import Server(s)

Assuming you want the data handled by the in-worker DIS to be handled exclusively by that new DIS instance, adjust the filters on any other DIS instances to exclude the table(s).

dataImportServers
...
db_dis:
  ...
  # don't process IngesterNamespace
  filters: {whereTableKey: "NamespaceSet = `System` && Namespace != `IngesterNamespace`"}

Change TDCP data routing configuration(s)

The tableDataServices section defines how data is routed. One or or more entries in this section will need to be adjusted so that query and merge workers will source data appropriately. In this example, it is the table data cache proxy service that serves live data from the data import services.

This example makes the table data cache proxy serve the live data, and points the IngesterNamespace namespace to the Ingester1 DIS while excluding it from the default DIS.

db_tdcp:
   host: localhost
   port: *default-tableDataCacheProxyPort
   sources:
   - name: db_dis
      # exclude namespace IngesterNamespace
      filters: {whereTableKey: "NamespaceSet = `System` && Namespace != `IngesterNamespace`"}
   # all user data
   - name: db_rta
      filters: {namespaceSet: User}
   # include only namespace IngesterNamespace
   - name: Ingester1
      filters: {whereTableKey: "NamespaceSet = `System` && Namespace == `IngesterNamespace`"}

Configure Local Storage Routing

Local storage (reading directly from the disk where the process is running) is the default method for accessing historical tables' data files. It is also the default method when reading data to be merged from intraday to historical. The data routing table normally contains an entry called "local" that is used for these purposes. Since the in-worker DIS process used to consume Ingester data has its own storage path, it needs its own local definition in the routing file. This can be combined with the original "local" or defined independently.

(local includes original local and Ingester1):

tableDataServices:
    ...
  ingest:
    storage: Ingester1
  local1:
    storage: default
    local:
    sources:
    - name: local1
    - name: ingest

(local is original local, and Ingester1 is referred to by ingest):

tableDataServices:
    ...
  ingest:
    storage: Ingester1
  local:
    storage: default

Any defined entries here will be available when creating merge jobs in the Persistent Query Configuration Editor's Merge Settings tab, under Table Data Service Configuration:

img

The above example corresponds to the routing entries which include original local (local1) and ingest under local. If desired, tags can be used to restrict which TableDataService entries are shown in the UI. See Tags and Descriptions in Data Routing Configuration.

Create a Schema

Each table you import must have a schema defined. You can create a Schema using the Schema Editor from the Deephaven Java client. The schema below is suitable for use with the Kafka test topic from the Kafka guide. However, you will need to create a schema with suitable data types for the records in your topic.

<Table name="Test" namespace="Kafka" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
  <Partitions keyFormula="${autobalance_by_first_grouping_column}" />

  <Column name="Date" dataType="String" columnType="Partitioning" />
  <Column name="KafkaPartition" dataType="Int" />
  <Column name="Offset" dataType="Long" />
  <Column name="Timestamp" dataType="DateTime" />
  <Column name="Key" dataType="String" columnType="Grouping" />
  <Column name="Value" dataType="String" />
</Table>

Using the updated configuration

After you have finished editing the routing configuration file, you need to make the contents available to your Deephaven systems.

Upload to etcd

Deephaven normally uses etcd to manage its configuration files. This means that Deephaven processes do not use the file on disk that you edited above. Instead, you need to upload that file to etcd. The most straightforward way to do this is with the dhconfig command. For more information on dhconfig, please see Configuration Tools.

From your server's command line, assuming a standard installation, run the following command:

sudo /usr/illumon/latest/bin/dhconfig routing import /etc/sysconfig/deephaven/illumon.d.latest/resources/routing_service.yml --etcd

Restart your Deephaven system

The changes to your routing will require multiple processes to be restarted, as these pick up routing configuration at startup; they do not discover routing changes while running. At a minimum, the DIS, LTDS, TDCP, query server, and merge server may need to be restarted. You can restart these individually, or simply use sudo monit restart all to restart your entire system. This is obviously a disruptive operation, and should only be undertaken during a maintenance period when the system is expected to be potentially unavailable at some point.