I've following the documentation at https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html#sqlserver-ad-hoc-snapshots
But when I registered the source connector to perform ad-hoc snapshot on 'dbo.customers' table with 'WHERE' condition 'last_name'='Walker' by writing to signaling table '{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}'. The connector still capture and snapshot all rows from 'customers' table, not 1 row as I expected.
I don't know which step did I miss ?
Here's my configuration steps:
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
CREATE TABLE customers (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
INSERT INTO dbo.debezium_signal (id, type, data)
VALUES ('ad-hoc-1','execute-snapshot','{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}');
{
"name": "customer-adhoc",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"topic.prefix" : "CDC",
"database.hostname" : "sqlserver12",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "Password!",
"database.names" : "testDB",
"snapshot.mode": "initial",
"schema.history.internal.kafka.bootstrap.servers" : "kafka12:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"include.schema.changes": "true",
"database.encrypt": "false",
"table.include.list": "dbo.customers,dbo.debezium_signal",
"column.mask.with.0.chars": "testDB.dbo.customers.first_name, testDB.dbo.customers.last_name",
"schema.history.internal.store.only.captured.tables.ddl": "true",
"schema.history.internal.store.only.captured.databases.ddl": "true",
"incremental.snapshot.allow.schema.changes" : "true" ,
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"schema.name.adjustment.mode": "avro",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
"key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
"key.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
"value.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
"signal.data.collection": "testDB.dbo.debezium_signal",
"signal.kafka.topic":"CDC.dbz-signal",
"kafka.consumer.offset.commit.enabled": "true",
"signal.kafka.groupId": "customer-kafka-signal",
"signal.kafka.bootstrap.servers": "kafka12:9092"
}
}
There are 2 types of Debezium snapshot:
initial
mode. It makes a full snapshot when you restart the connector. That's why you have all rows captured at the beginning."additional-conditions"
should be according to the documentation. Or you can insert it without any conditions first, and then if it works, add "additional-conditions"
.