sql-serversnapshotdebeziumconnectoradhoc

How to perform Ad-Hoc Snapshot with Debezium SQL Server Source Connector


I've following the documentation at https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html#sqlserver-ad-hoc-snapshots

But when I registered the source connector to perform ad-hoc snapshot on 'dbo.customers' table with 'WHERE' condition 'last_name'='Walker' by writing to signaling table '{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}'. The connector still capture and snapshot all rows from 'customers' table, not 1 row as I expected.

I don't know which step did I miss ?

Here's my configuration steps:

  1. Populate DB
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
  1. Create Signaling table & add a signal record
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
INSERT INTO dbo.debezium_signal (id, type, data) 
VALUES ('ad-hoc-1','execute-snapshot','{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}');
  1. Config Source Connector and Start it.
{
    "name": "customer-adhoc",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "topic.prefix" : "CDC",
        "database.hostname" : "sqlserver12",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "Password!",
        "database.names" : "testDB",
        "snapshot.mode": "initial",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka12:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "include.schema.changes": "true",
        "database.encrypt": "false",
        "table.include.list": "dbo.customers,dbo.debezium_signal",
        "column.mask.with.0.chars": "testDB.dbo.customers.first_name, testDB.dbo.customers.last_name",
        "schema.history.internal.store.only.captured.tables.ddl": "true",
        "schema.history.internal.store.only.captured.databases.ddl": "true",
        "incremental.snapshot.allow.schema.changes" : "true" ,
        "key.converter.apicurio.registry.auto-register": "true",
        "key.converter.apicurio.registry.find-latest": "true",
        "value.converter.apicurio.registry.auto-register": "true",
        "value.converter.apicurio.registry.find-latest": "true",
        "schema.name.adjustment.mode": "avro",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "key.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
        "value.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
        "signal.data.collection": "testDB.dbo.debezium_signal",
        "signal.kafka.topic":"CDC.dbz-signal",
        "kafka.consumer.offset.commit.enabled": "true",
        "signal.kafka.groupId": "customer-kafka-signal",
        "signal.kafka.bootstrap.servers": "kafka12:9092"
    }
}

Solution

  • I solved my issue. Thanks to Artem and Panagiotis Kanavos. When Connector's configuration "snapshot.mode" is set to "initial", all rows in the table were captured. Snapshot, can be done with subset datas of a source table as mentioned before. The one that cause full snapshot is "initial" ( I was careless not to pay attention to this option ).

    So, here's my procedure:

    1. Set "snapshot.mode" to "schema-only". ( Only changes from now on will be captured )
    2. Restart Connector.
    3. Add Signal Record to Signaling Table

    The Ad-Hoc snapshot works like charm.