I want to extract key and append it to the value by Kafka Connect. I read SMT and test some SMT but I can not do it.
I send this record value :
{"name":"ali"}
by this key :
Person
and I want this JSON store to the openserach (or elastic search)
{"name":"ali","key":"Person"}
this is my connector and I know this is wrong but I don't know how can I fix it :
{
"name": "aaa",
"config": {
"name": "aaa",
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"tasks.max": "1",
"topics": "test_x",
"transforms": "InsertKeyToValue",
"transforms.InsertKeyToValue.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertKeyToValue.static.field": "MessageSource",
"transforms.InsertKeyToValue.static.value": "key",
// "transforms.InsertKeyToValue.field": "key",
// "transforms.InsertKeyToValue.key.field": "key",
"key.ignore": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"connection.url": "https://localhost:9200",
"connection.username": "test",
"connection.password": "test",
"batch.size": "10000",
"linger.ms": "1000",
"errors.tolerance": "all",
"errors.log.include.messages": "true",
"errors.log.enable": "true"
}
}
@Akshay Bande thanks.
I've created a simple transformation for this task.