mysqlapache-kafkaapache-kafka-connectdebeziumcdc

Configure a debezium connector for multiple tables in a database


I'm trying to configure a Debezium connector for multiple tables in a MySQL database (i'm using debezium 1.4 on a MySQL 8.0). My company have a nomenclature pattern to follow when creating topics in kafka, and this pattern does not allow the use of underscores (_), so I had to replace them with hyphens (-)

So, my topics names are:

Topic 1

fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status". 
- All changes in that table, must go to that topic.

Topic 2

fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table, must go to that topic.

Topic 3

fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table, must go to that topic.

I'm trying to use the transforms "ByLogicalTableRouter", but i can't find a regex solution that solve my case.

{ "name": "debezium.connector",
 "config":
    { 
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "myhostname",
"database.port": "3306",
"database.user": "debezium", 
"database.password": "password", 
"database.server.id": "1000", 
"database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search",
"schema.include.list": "transaction_search",
"table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal",
"database.history.kafka.bootstrap.servers": "kafka.intranet:9097",
"database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history",
"snapshot.mode": "schema_only",
"transforms":"RerouteName,RerouteUnderscore",
"transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)",
"transforms.RerouteName.topic.replacement": "$1$2" 
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)",
"transforms.RerouteUnderscore.topic.replacement": "$1-$2" 
    }
}

But with that, I'm getting the error below, which indicates that it is trying to send everything to the same topic

Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __dbz__physicalTableIdentifier

How can i make a transform that will forward the events of each table to their respective topic?


Solution

    1. Removing the schema name

    In the first transforms,im trying to remove the duplicated schema name in the topic routering.

    After transforamtion with your regex you'll have two dots, so you need to fix it:

    "transforms.RerouteName.topic.regex":"([^.]+)\\.transaction_search\\.([^.]+)",
    "transforms.RerouteName.topic.replacement": "$1.$2" 
    
    1. Replace underscores for hiphens

    You can try to use ChangeCase SMT from Kafka Connect Common Transformations.