mongodbapache-kafkaapache-kafka-connectdebeziumdebezium-connect

Why Debezium Mongo Source Kafka Connector produces string `after` field instead of a Json Object?


Here is my configuration I am using -

{
  "name": "mongo-debezium-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "1",
    "mongodb.connection.string": "mongodb://mongo:27017/?replicaSet=rs0",
    "database.include.list": "sample",
    "collection.include.list": "sample.workflows,sample.simulations",
    "topic.prefix": "mongo",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": true
  }
}

Here is the document that has been inserted in Mongo -

{
  "_id": {
    "$oid": "676d8e51105e01702fe9496c"
  },
  "name": "workflow 3"
}

Here is the corresponding Kafka event inspected through Kowl -

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "string",
                "optional": true,
                "name": "io.debezium.data.Json",
                "version": 1,
                "field": "before"
            },
            {
                "type": "string",
                "optional": true,
                "name": "io.debezium.data.Json",
                "version": 1,
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "array",
                        "items": {
                            "type": "string",
                            "optional": false
                        },
                        "optional": true,
                        "field": "removedFields"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Json",
                        "version": 1,
                        "field": "updatedFields"
                    },
                    {
                        "type": "array",
                        "items": {
                            "type": "struct",
                            "fields": [
                                {
                                    "type": "string",
                                    "optional": false,
                                    "field": "field"
                                },
                                {
                                    "type": "int32",
                                    "optional": false,
                                    "field": "size"
                                }
                            ],
                            "optional": false,
                            "name": "io.debezium.connector.mongodb.changestream.truncatedarray",
                            "version": 1
                        },
                        "optional": true,
                        "field": "truncatedArrays"
                    }
                ],
                "optional": true,
                "name": "io.debezium.connector.mongodb.changestream.updatedescription",
                "version": 1,
                "field": "updateDescription"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ns"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "collection"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ord"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "lsid"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txnNumber"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "wallTime"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.mongo.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": true,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "name": "event.block",
                "version": 1,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "mongo.sample.workflows.Envelope"
    },
    "payload": {
        "before": null,
        "after": "{\"_id\": {\"$oid\": \"676d8e51105e01702fe9496c\"},\"name\": \"workflow 3\"}",
        "updateDescription": null,
        "source": {
            "version": "3.0.6.Final",
            "connector": "mongodb",
            "name": "mongo",
            "ts_ms": 1735233105000,
            "snapshot": "false",
            "db": "sample",
            "sequence": null,
            "ts_us": 1735233105000000,
            "ts_ns": 1735233105000000000,
            "collection": "workflows",
            "ord": 1,
            "lsid": null,
            "txnNumber": null,
            "wallTime": 1735233105571
        },
        "op": "c",
        "ts_ms": 1735233105616,
        "transaction": null
    }
}

See the payload.after field is a String instead of a plain Json object. Due to this, I cannot apply unwrap transform with type ExtractNewRecordState. It throws the following error -

org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [source field insertion], found: java.lang.String

What could be the reason behind this problem? It would be really helpful if someone points me to the right direction troubleshooting this.

Note: Version of the connector 3.0.6.Final


Solution

  • It turned out, the after field should be a String according to the documentation. The problem with the transform I was facing was due to using a wrong class. I was using io.debezium.connector.transforms.ExtractNewDocumentState, instead we have to use io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.