apache-kafkaapache-kafka-connectmongodb-kafka-connector

How to change the name of the topic generated by Kafka Connect Source Connector


I have an already running production deployed Kafka-Cluster and having Topic "existing-topic". I am using MongoDB-Source-Connector from Debezium.

Here all what I want is to push the CDC events directly to the topic "existing-topic" so that my consumers which are already listening to that topic will process it.

I didn't find any resource to do it so, however it's mentioned that topic is created in below format -

"If your mongodb.name parameter is A, database name is B and collection name is C, the data from database A and collection C will be loaded under the topic A.B.C"

Can I change the topic to "existing-topic" and push the events to it?


Solution

  • According to the documentation,

    The name of the Kafka topics always takes the form logicalName.databaseName.collectionName, where logicalName is the logical name of the connector as specified with the mongodb.name configuration property, databaseName is the name of the database where the operation occurred, and collectionName is the name of the MongoDB collection in which the affected document existed.


    This means that if your connector's logical name is myConnector and your database myDatabase has two collections users and orders

    {
      "name": "myConnector",  
      "config": {
        "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
        "mongodb.hosts": "mongo-db-host:27017", 
        "mongodb.name": "myDatabase", 
        "collection.whitelist": "myDatabase[.]*", 
      }
    }
    

    then Kafka Connect will populate two topics with names:


    Now if you still want to change the name of the target topic, you can make use of Kafka Connect Single Message Transforms (SMT). More precisely, ExtractTopic should help you. Note though that this SMT helps you extract the topic name from the key or value of the message, therefore you somehow need to include the desired topic name in the payload.

    For example, the following SMT will extract the value of field myField and use this as the record's topic:

     transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
     transforms.ValueFieldExample.field=myField