apache-kafkaapache-kafka-connect

Debezium source connector, add a field to the `after` object


I am using the Debezium PostgreSQL source connector for CDC in Kafka. Below is the desired message schema I want to push to Kafka. Most noteworthy, is the after.source field.

The after.source field is a field with a static, never-changing field that does not exist in the source database. I cannot edit or create Views in the source database due to the vendor's Support agreement.

{
    "before": null,
    "after": {
      "rid": "3b99c447-65a8-4d6b-bbff-2c33b7944696",
      "cust": 75862,
      "loc": 916719,
      "meter": "A90OC5385040",
      "cosum": "2.06",
      "cosdt": 1673330400000000,
      "costy": "I",
      "source": "C"
    },
    "source": {
      "version": "2.4.2.Final",
      "connector": "postgresql",
      "name": "mmv2_pgami",
      "ts_ms": 1711944632077,
      "snapshot": "false",
      "db": "Pgami_db",
      "sequence": "[null,\"23516504\"]",
      "schema": "public",
      "table": "mreads",
      "txId": 574,
      "lsn": 23516504,
      "xmin": null
    },
    "op": "c",
    "ts_ms": 1711944632426,
    "transaction": null
  }

Can I add a source field embedded into the after object from within the Debezium Source Connector (as shown in the example above)?

Below is what I've tried, but it is adding a new field to the root called "after.source" which is not correct.

{
    "name": "postgres_connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        // Other configuration properties...

        // Add the following configuration for the AddFields transformation
        "transforms": "addSourceField",
        "transforms.addSourceField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addSourceField.static.field": "after.source",
        "transforms.addSourceField.static.value": "C"
    }
}

Solution

  • @OneCricketeer is correct, so borrowing from his response and adding to it, basically, I ended up using ExtractNewRecordState to flatten the rule down to the after object only. Thereafter I use the rename transform:

    {
        "name": "postgres_connector",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            // Other configuration properties...
    
            // Add the following configuration for the AddFields transformation
            "transforms": "unwrap,addSourceField",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones": "true",
            "transforms.unwrap.delete.handling.mode": "none",
            "transforms.addSourceField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.addSourceField.static.field":"source",
            "transforms.addSourceField.static.value":"C"
        }
    }