SQL Server CDC

Deephaven provides a mechanism for consuming SQL Server Change Data Capture (CDC) data. Using this technology supports asynchronous replication of changes to SQL Server database tables in Deephaven. SQL Server CDC replication for Deephaven depends on the standard generated Deephaven logger with a few special columns that permit reconstruction of the source table state.

Deephaven is an append-only database, therefore replication involves logging INSERT/UPDATE/DELETE operations to a Deephaven table. Each row logged will contain the row data (in the case of INSERT/UPDATE at least), a SQL Server log sequence number, a SQL Server sequence value, an operation column, and an update mask. Other than in snapshots, these are passed through directly from the SQL Server CDC data. Snapshots are produced as simulated INSERT statements.

By using the SQL server sequence value and operation columns, and the Deephaven lastBy() function, you can duplicate the SQL Server table state. For example, if you have a table where the primary key is "Sym", you can use the following query to produce the table state:

tableState = table.where("Date >= <last snapshot date>")
    .lastBy("Sym")
    .where("__Operation != 1 && __Operation != 3")
    .dropColumns("__LSN","__Seqval","__Operation","__UpdateMask")

Note that this expression depends on the presence of a unique primary key. While the replication process itself does not require the source table have a primary key defined, it is highly recommended, as the operation above requires some way to uniquely identify rows for updates and deletes. See below for the meaning of each of the replication columns.

Special Replication Columns

These columns are added to every Deephaven table generated for CDC replication.

  • __LSN - SQL Server log sequence number (__$start_lsn in CDC).
  • __Seqval - SQL Server sequence number (__$seqval in CDC).
  • __Operation - SQL Server operation (__$operation in CDC). Values:
    1. DELETE
    2. INSERT
    3. PRE-UPDATE
    4. POST-UPDATE
  • __UpdateMask - SQL Server update mask value (__$update_mask in CDC). Indicates which columns are updated by this operation. This is a variable length bit field with one bit per column.

Process Overview

Configuring the import process is described by the following steps.

Note: General knowledge of the Deephaven Data Import process is helpful.

  1. Enable CDC in SQL Server, and enable CDC for each table you wish to replicate.
  2. Generate Deephaven tables for each table you wish to replicate. The easiest way to do this is to use the cdc_repl_schema script provided with Deephaven.
  3. Make any desired changes and deploy the schema.
  4. Generate loggers & listeners for the new tables. The loggers must implement the SQLReplicationLogger interface and take a single input of type SQLReplicationRow. If you used the script mentioned above, it will automatically generate a schema consistent with the CDC replicator.
  5. Configure one or more CDCReplicator processes on a Deephaven server. This process will consume the SQL Server CDC data and log it to Deephaven using the loggers generated in step 3.
  6. Configure a tailer process to read from the logs generated by the CDCReplicator. This tailer should be configured and connect to a Data Import Server (DIS) in the same way as any other tailer.

The following sections provide details on each step.

Configure CDC in SQL Server

Here we provide a summary of how to enable CDC, but the details of this SQL Server feature are beyond the scope of this document. For detailed documentation, see the Microsoft website.

  • First connect to your SQL Server instance. The CDC feature must be installed. Note: this feature is not supported by the Linux version of SQL Server.
  • To enable CDC in a given database: EXEC sys.sp_cdc_enable_db
  • To enable CDC for a given table (no filegroup specified here, although this is recommended by Microsoft):
    EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name   = N'MyTable',
    @role_name     = N'MyRole',
    @supports_net_changes = 0
    
    Note that @supports_net_changes is set to 0. The CDCReplicator does not presently support querying for net changes, so there is no benefit to enabling this feature (and there is some cost, because it requires an additional index on the CDC capture table).

The steps described here should create all the required constructs for the CDCReplicator, as well as SQL Server Agent jobs that maintain the data.

Generate Deephaven Schema

The easiest way to generate Deephaven tables matching the SQL Server tables is to use the cdc_repl_schema script. This is part of the Deephaven distribution and calls the JDBC schema creator tool with the appropriate parameters for a typical replication use-case.

The syntax is as follows:

cdc_repl_schema <JDBC URL> <Deephaven Namespace> <Logger Package> <Process Suffix> [options]

Options:

  • [-r|--replace] - Replace existing
  • [-s|--sourceCasing <value>] - SQL Server casing
  • [-d|--destCasing <value>] - Deephaven casing

Casing values include the following:

  • LOWER_HYPEN
  • UPPER_HYPHEN
  • LOWER_UNDERSCORE
  • UPPER_UNDERSCORE
  • LOWER_CAMEL
  • UPPER_CAMEL

This script takes a list of tables from standard input and generates a schema in the current directory.

The script will also generate a sample configuration for the CDC replicator process in the current directory with the name cdc_replicator<Process Suffix>.prop, and a tailer configuration file with the name tailerConfig_cdc_replicator<Process Suffix>.xml. These two files can be very useful for large configurations because you can avoid writing the configuration by hand for each table. See Replicator Configuration Properties for details on the configuration file.

The "casing" options provide for systematic conversion of column names from a convention used in SQL Server to an alternate convention in Deephaven. For example, using LOWER_UNDERSCORE as the source casing and UPPER_CAMEL as the destination casing would convert "a_column_name" in SQL Server to "AColumnName" in Deephaven.

Example

The following command generates a schema for all tables in the tables.txt file along with a configuration file and a casing conversion. It assigns a process suffix of "1", which indicates a process name of cdc_replicator1.

/usr/illumon/latest/bin/cdc_repl_schema "jdbc:sqlserver://sqlserverhost;database=mydb;user=<user>;password=<pwd>" Test com.mycompany.iris.db 1 -r -s LOWER_UNDERSCORE -d UPPER_CAMEL < tables.txt

The tables.txt file should have a line for each table with one or two values. If two values are specified, the first is the SQL Server table name and the second is the Deephaven name. Otherwise the same name is used for both. Here's an example where the Deephaven tables use an upper-camel format.

tables.txt

security Security
equity_option EquityOption
trade Trade

Once you have a schema and configuration file, you may want to inspect and possibly customize them. You may safely change the names of the Deephaven columns. Or, if there are SQL server columns you don't want to replicate, you can safely remove them, with the exception of the special replication-related columns added by the schema generator (these are prefixed with two underscores). Also you can modify the default type mappings if you like; this will require modifying the intradaySetter value to produce a value of the correct type.

You may also generate a schema for a single SQL Server table using the cdc_repl_schema_table script. This does not generate a replicator configuration file but might be useful for testing or one-off additions to an existing replicator.

The syntax is as follows:

cdc_repl_schema_table <JDBC URL> <SQL Server Table> <Deephaven Namespace> <Deephaven Table> <Logger Package> [options]

Options:

  • [-r|--replace] - Replace existing
  • [-s|--sourceCasing <value>] - SQL Server casing
  • [-d|--destCasing <value>] - Deephaven casing

Casing values include the following:

  • LOWER_HYPEN
  • UPPER_HYPHEN
  • LOWER_UNDERSCORE
  • UPPER_UNDERSCORE
  • LOWER_CAMEL
  • UPPER_CAMEL

Example

To generate the schema for the SQL Server "trade" table as a table "Trade" in Deephaven:

/usr/illumon/latest/bin/cdc_repl_schema_table "jdbc:sqlserver://sqlserverhost;database=mydb;user=<user>;password=<pwd>" trade Test Trade com.mycompany.iris.db -r -s LOWER_UNDERSCORE -d UPPER_CAMEL

Update the Deephaven Schema

Once you are happy with the schema, deploy it and restart the Data Import Server.

sudo -u irisadmin /usr/illumon/latest/bin/dhconfig schema import -d <directory with schema>

You should see your tables listed in the dhconfig output.

Generate Loggers & Listeners

The process for generating the loggers and listeners for CDC replication is the same as any other:

sudo -u irisadmin /usr/illumon/latest/bin/iris generate_loggers

Configure CDC Replicator

Like other Deephaven server processes, the CDC replicator is configured via a property file. You may run any number of CDC replicator processes, and each one may replicate any number of tables. However, each process can connect to only a single SQL Server database.

Each replicator reads its configuration using a prefix based on the assigned process name suffix (this allows you to configure any number of replicators in the same property file). Every CDC replicator process has a process name with the pattern cdc_replicator<suffix>. When the replicator starts, it will attempt to load its initial state, as represented by a SQL Server log sequence number (LSN), from the most recent existing Deephaven log file. This way the replication can restart where it left off, so it is important to not clean up old log files too aggressively.

To configure a replicator for production, use the following steps. A process name suffix of "1" is assumed.

  1. Update the host config file cdc_replicator section.
sudo vi /etc/sysconfig/illumon.confs/illumon.iris.hostconfig

This is a general configuration for use with the start_cdc_replicator script, which is designed for a process name suffix:

cdc_replicator)
       CONFIGFILE="iris-cdc_replicator$proc_suffix.prop"
       EXTRA_ARGS=""
       RUN_AS=irisadmin
       WORKSPACE=/db/TempFiles/$RUN_AS/cdc_replicator$proc_suffix
       ;;
  1. Add a monit entry for the replicator:
sudo -u irisadmin vi /etc/sysconfig/illumon.d/monit/cdc_replicator1.conf

The monit file should have the following structure:

check process cdc_replicator1 with pidfile /var/run/illumon/cdc_replicator1.pid
start program = "/etc/init.d/iris start cdc_replicator 1"
stop program = "/etc/init.d/iris stop cdc_replicator 1"
  1. Configure a property file for the new replicator. You may use the file generated along with the schema.
sudo cp iris-cdc_replicator1.prop /etc/sysconfig/illumon.d/resources

Change ownership to match existing configuration files:

sudo chown irisadmin /etc/sysconfig/illumon.d/resources/iris-cdc_replicator1.prop
sudo chgrp dbmergegrp /etc/sysconfig/illumon.d/resources/iris-cdc_replicator1.prop
  1. Configure a tailer to read the logs produced by the CDC replicator. You may use the tailer configuration file generated along with the schema (customize as desired). You can add a new tailer or add this configuration to the predefined tailer1 as follows:
  • Install the tailer configuration file: sudo cp tailerConfig_cdc_replicator1.xml /etc/sysconfig/illumon.d/resources/
  • Change ownership to match existing configuration files:
sudo chown irisadmin /etc/sysconfig/illumon.d/resources/tailerConfig_cdc_replicator1.xml
sudo chgrp dbmergegrp /etc/sysconfig/illumon.d/resources/tailerConfig_cdc_replicator1.xml
  • Add the process name to the tailer service startup. Edit the /etc/sysconfig/illumon file as follows:

    tailer)
          case "$proc_suffix" in
          1)
             CONFIGFILE=iris-tailer1.prop
             PROCESSES="db_internal,cdc_replicator1"
                  ;;
    
  • The tailer should be configured to read from whatever location is specified in the CDC replicator logPath property with the log.tailer.defaultDirectories property or in its XML entry. Normally this is /db/TempFiles/irisadmin/logs.

  1. Reload the monit configuration. This should cause the replicator and tailer to start.
    sudo -u irisadmin monit reload
    

If troubleshooting is needed, check the log files.

Replicator Configuration Properties

General Configuration Properties

PropertyDescriptionDefault
connectionUrlJDBC url for the SQL Server instance.None (required)
logPathDirectory in which to log Deephaven binary log files for consumption by the tailer.None (required)
loggerQueueSizeDeephaven logger queue size.10,000
namespaceThe Deephaven namespace you are logging to (can be specified or overridden per-table).None
cdcPollIntervalWait period between polls of SQL Server for new CDC data (in milliseconds).5,000
replicationThreadsHow many threads to use for replication. This controls how many simultaneous connections the replicator may create to SQL Server (the replicator will use at most one per table), so it is important that this not be too large.10
snapshotModeIndicates when the replicator should take a snapshot of the entire SQL server table and play it into the Deephaven log. The replicator will then start replicating using the SQL Server LSN immediately after the snapshot. There are three possible values:
  • INITIAL: this takes a snapshot only when the replicator is first started and there are no existing log files.
  • ALWAYS: this takes a snapshot every time the replicator is (re)started. Use this if you restart the replicator once a day/week and you want to have a fresh copy of the table logged each time (could take a while for large tables).
  • NEVER: never take a snapshot.
INITIAL
snapshotFetchSizeWhen taking a snapshot, how many rows to read at a time (uses a SQL Server cursor). This helps control how much RAM the replicator may use. Note: The value used for the replicationThreads property will determine the maximum number of snapshots running at the same time.10,000

Per-table Configuration Properties

PropertyDescriptionDefault
namespaceDeephaven table namespaceNone; optional only if specified in replicator configuration.
tableNameDeephaven table nameNone (required)
captureInstanceSQL Server CDC capture instance. This typically has the form <schema>_<table name>. See Microsoft documentation for details.None (required)
sqlServerTableNameSQL Server table name (used for snapshots)None (required)
loggerClassNameFully qualified java class for the generated logger.None (required)
snapshotFetchSizeSee General Configuration Properties above.None; optional only if specified in replicator configuration.
snapshotModeSee General Configuration Properties above.None; optional only if specified in replicator configuration
logPathSee General Configuration Properties above.None; optional only if specified in replicator configuration
loggerQueueSizeSee General Configuration Properties above.None; optional only if specified in replicator configuration
logFormatInteger to use for log format. Used for versioning log files when table schema changes.If not specified, defaults to default value in Deephaven configuration.

Example Configuration

# General replicator config
cdc_replicator1.connectionUrl=jdbc:sqlserver://sqlserverhost;database=mydb;user=<user>;password=<pwd>
cdc_replicator1.logPath=/tmp
cdc_replicator1.loggerQueueSize=10000
cdc_replicator1.namespace=Test
cdc_replicator1.cdcPollInterval=5000
cdc_replicator1.replicationThreads=2cdc_replicator1.snapshotMode=INITIAL

# Trade table config
cdc_replicator1.1.tableName=Trade
cdc_replicator1.1.captureInstance=dbo_trade
cdc_replicator1.1.sqlServerTableName=trade
cdc_replicator1.1.loggerClassName=com.mycompany.iris.db.loggers.TradeLogger

# Security table config
cdc_replicator1.2.tableName=Security
cdc_replicator1.2.captureInstance=dbo_security
cdc_replicator1.2.sqlServerTableName=security
cdc_replicator1.2.loggerClassName=com.mycompany.iris.db.loggers.SecurityLogger