I've following the documentation at https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html#sqlserver-ad-hoc-snapshots
But when I registered the source connector to perform ad-hoc snapshot on 'dbo.customers' table with 'WHERE' condition 'last_name'='Walker' by writing to signaling table '{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}'. The connector still capture and snapshot all rows from 'customers' table, not 1 row as I expected.
I don't know which step did I miss ?
Here's my configuration steps:
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
CREATE TABLE customers (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
INSERT INTO dbo.debezium_signal (id, type, data)
VALUES ('ad-hoc-1','execute-snapshot','{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}');
{
"name": "customer-adhoc",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"topic.prefix" : "CDC",
"database.hostname" : "sqlserver12",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "Password!",
"database.names" : "testDB",
"snapshot.mode": "initial",
"schema.history.internal.kafka.bootstrap.servers" : "kafka12:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"include.schema.changes": "true",
"database.encrypt": "false",
"table.include.list": "dbo.customers,dbo.debezium_signal",
"column.mask.with.0.chars": "testDB.dbo.customers.first_name, testDB.dbo.customers.last_name",
"schema.history.internal.store.only.captured.tables.ddl": "true",
"schema.history.internal.store.only.captured.databases.ddl": "true",
"incremental.snapshot.allow.schema.changes" : "true" ,
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"schema.name.adjustment.mode": "avro",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
"key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
"key.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
"value.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
"signal.data.collection": "testDB.dbo.debezium_signal",
"signal.kafka.topic":"CDC.dbz-signal",
"kafka.consumer.offset.commit.enabled": "true",
"signal.kafka.groupId": "customer-kafka-signal",
"signal.kafka.bootstrap.servers": "kafka12:9092"
}
}
I solved my issue. Thanks to Artem and Panagiotis Kanavos. When Connector's configuration "snapshot.mode" is set to "initial", all rows in the table were captured. Snapshot, can be done with subset datas of a source table as mentioned before. The one that cause full snapshot is "initial" ( I was careless not to pay attention to this option ).
So, here's my procedure:
The Ad-Hoc snapshot works like charm.