postgresqljdbcapache-kafkacdc

JDBC sink connector can not consume delete operation but can consume update and delete (postgres)


I have a source connector and a sink connector running on docker. I have built a cdc to allow sync from source to sink db which is both postgres. I got the update and insert working pretty well but I am having problems with my delete operations and it seems that my sink connector is not consuming it as it does not show logs unlike for the insert and update operation. I already added the "delete.enabled": "true" and "tombstones.on.delete": "true" ont the sink and still can't get any progress.

"source connector configuration" `

{
    "name": "cdm-source-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "cdm-db",
        "database.port": "5432",
        "database.user": "utanga",
        "database.password": "****",
        "database.dbname": "cdm_dev",
        "topic.prefix": "utanga_dev",
        "tombstones.on.delete": "true"
    }
}

`

"sink connector configuration"

{
    "name": "port-charges-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:postgresql://db:5432/utanga_dev?user=utanga&password=changeme",
        "topics": "utanga_dev.public.exchange_sellerportcharge",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://host.docker.internal:8081",
        "key.converter.schema.registry.url": "http://host.docker.internal:8081",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "auto.create": "true",
        "auto.evolve": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields":"id",
        "delete.enabled": "true", // Enable delete handling
        "tombstones.on.delete": "true", // Handle tombstone records for deletes
        "table.name.format": "${topic}",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
}

I've been working on this for weeks now and it feels like I'm already running in circles.


Solution

  • I finally figured out what's needed for this.

    The answers were on the transformation of tombstones allowing the sink to consume these messages. More info on this post.: https://forum.confluent.io/t/when-kafka-connect-sink-deleted-rows-are-not-reflected-in-the-destination-db/8998, and here: https://debezium.io/documentation/reference/stable/transformations/event-flattening.html.

    I'm using Debezium for my source and JDBC as my sink (both are confluent images running on docker)

    {
    "name": "source-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "your-hostname",
        "database.port": "5432",
        "database.user": "your-user",
        "database.password": "your-password",
        "database.dbname": "your-db",
        "topic.prefix": "utanga_dev"
    }
    
    
    {
    "name": "sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "utanga_dev.public.exchange_chargetype",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://host.docker.internal:8081",
        "key.converter.schema.registry.url": "http://host.docker.internal:8081",
        "connection.url": "jdbc:postgresql://your-db:5432/your-db?user=your-user&password=your-password",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "auto.create": "false",
        "auto.evolve": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "code",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "table.name.format": "${topic}",
        "delete.enabled":true,
        "delete.handling​.mode":"none",
        "delete.tombstone.handling.mode":"drop",
        "transforms.unwrap.delete.handling.mode": "none",
        "transforms.unwrap.drop.tombstones": "false"
    }
    

    At first I thought that I needed to manually set the key of the messages generated by kafka, but since I am using debezium postgres this is not the case as the connector is automatically handling it so I just needed a way that the sink connector can consume the messages by unpacking it.