sql-serversnapshotdebeziumconnectoradhoc

How to perform Ad-Hoc Snapshot with Debezium SQL Server Source Connector


I've following the documentation at https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html#sqlserver-ad-hoc-snapshots

But when I registered the source connector to perform ad-hoc snapshot on 'dbo.customers' table with 'WHERE' condition 'last_name'='Walker' by writing to signaling table '{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}'. The connector still capture and snapshot all rows from 'customers' table, not 1 row as I expected.

I don't know which step did I miss ?

Here's my configuration steps:

  1. Populate DB
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
  1. Create Signaling table & add a signal record
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
INSERT INTO dbo.debezium_signal (id, type, data) 
VALUES ('ad-hoc-1','execute-snapshot','{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}');
  1. Config Source Connector and Start it.
{
    "name": "customer-adhoc",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "topic.prefix" : "CDC",
        "database.hostname" : "sqlserver12",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "Password!",
        "database.names" : "testDB",
        "snapshot.mode": "initial",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka12:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "include.schema.changes": "true",
        "database.encrypt": "false",
        "table.include.list": "dbo.customers,dbo.debezium_signal",
        "column.mask.with.0.chars": "testDB.dbo.customers.first_name, testDB.dbo.customers.last_name",
        "schema.history.internal.store.only.captured.tables.ddl": "true",
        "schema.history.internal.store.only.captured.databases.ddl": "true",
        "incremental.snapshot.allow.schema.changes" : "true" ,
        "key.converter.apicurio.registry.auto-register": "true",
        "key.converter.apicurio.registry.find-latest": "true",
        "value.converter.apicurio.registry.auto-register": "true",
        "value.converter.apicurio.registry.find-latest": "true",
        "schema.name.adjustment.mode": "avro",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "key.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
        "value.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
        "signal.data.collection": "testDB.dbo.debezium_signal",
        "signal.kafka.topic":"CDC.dbz-signal",
        "kafka.consumer.offset.commit.enabled": "true",
        "signal.kafka.groupId": "customer-kafka-signal",
        "signal.kafka.bootstrap.servers": "kafka12:9092"
    }
}

Solution

  • There are 2 types of Debezium snapshot: