I am using the Debezium PostgreSQL source connector for CDC in Kafka. Below is the desired message schema I want to push to Kafka. Most noteworthy, is the after.source
field.
The after.source
field is a field with a static, never-changing field that does not exist in the source database. I cannot edit or create Views in the source database due to the vendor's Support agreement.
{
"before": null,
"after": {
"rid": "3b99c447-65a8-4d6b-bbff-2c33b7944696",
"cust": 75862,
"loc": 916719,
"meter": "A90OC5385040",
"cosum": "2.06",
"cosdt": 1673330400000000,
"costy": "I",
"source": "C"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "mmv2_pgami",
"ts_ms": 1711944632077,
"snapshot": "false",
"db": "Pgami_db",
"sequence": "[null,\"23516504\"]",
"schema": "public",
"table": "mreads",
"txId": 574,
"lsn": 23516504,
"xmin": null
},
"op": "c",
"ts_ms": 1711944632426,
"transaction": null
}
Can I add a source
field embedded into the after
object from within the Debezium Source Connector (as shown in the example above)?
Below is what I've tried, but it is adding a new field to the root called "after.source" which is not correct.
{
"name": "postgres_connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
// Other configuration properties...
// Add the following configuration for the AddFields transformation
"transforms": "addSourceField",
"transforms.addSourceField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addSourceField.static.field": "after.source",
"transforms.addSourceField.static.value": "C"
}
}
@OneCricketeer is correct, so borrowing from his response and adding to it, basically, I ended up using ExtractNewRecordState
to flatten the rule down to the after
object only. Thereafter I use the rename transform:
{
"name": "postgres_connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
// Other configuration properties...
// Add the following configuration for the AddFields transformation
"transforms": "unwrap,addSourceField",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "none",
"transforms.addSourceField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addSourceField.static.field":"source",
"transforms.addSourceField.static.value":"C"
}
}