I have a source connector and a sink connector running on docker. I have built a cdc to allow sync from source to sink db which is both postgres. I got the update and insert working pretty well but I am having problems with my delete operations and it seems that my sink connector is not consuming it as it does not show logs unlike for the insert and update operation. I already added the "delete.enabled": "true" and "tombstones.on.delete": "true" ont the sink and still can't get any progress.
"source connector configuration" `
{
"name": "cdm-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "cdm-db",
"database.port": "5432",
"database.user": "utanga",
"database.password": "****",
"database.dbname": "cdm_dev",
"topic.prefix": "utanga_dev",
"tombstones.on.delete": "true"
}
}
`
"sink connector configuration"
{
"name": "port-charges-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://db:5432/utanga_dev?user=utanga&password=changeme",
"topics": "utanga_dev.public.exchange_sellerportcharge",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://host.docker.internal:8081",
"key.converter.schema.registry.url": "http://host.docker.internal:8081",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields":"id",
"delete.enabled": "true", // Enable delete handling
"tombstones.on.delete": "true", // Handle tombstone records for deletes
"table.name.format": "${topic}",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
I've been working on this for weeks now and it feels like I'm already running in circles.
I finally figured out what's needed for this.
The answers were on the transformation of tombstones allowing the sink to consume these messages. More info on this post.: https://forum.confluent.io/t/when-kafka-connect-sink-deleted-rows-are-not-reflected-in-the-destination-db/8998, and here: https://debezium.io/documentation/reference/stable/transformations/event-flattening.html.
I'm using Debezium for my source and JDBC as my sink (both are confluent images running on docker)
{
"name": "source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "your-hostname",
"database.port": "5432",
"database.user": "your-user",
"database.password": "your-password",
"database.dbname": "your-db",
"topic.prefix": "utanga_dev"
}
{
"name": "sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "utanga_dev.public.exchange_chargetype",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://host.docker.internal:8081",
"key.converter.schema.registry.url": "http://host.docker.internal:8081",
"connection.url": "jdbc:postgresql://your-db:5432/your-db?user=your-user&password=your-password",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"auto.create": "false",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "code",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"table.name.format": "${topic}",
"delete.enabled":true,
"delete.handling​.mode":"none",
"delete.tombstone.handling.mode":"drop",
"transforms.unwrap.delete.handling.mode": "none",
"transforms.unwrap.drop.tombstones": "false"
}
At first I thought that I needed to manually set the key of the messages generated by kafka, but since I am using debezium postgres this is not the case as the connector is automatically handling it so I just needed a way that the sink connector can consume the messages by unpacking it.