In my producer, I've added a few custom keys in the headers. Using logstash i need to get the payload in addition I need to read the value of my keys.
Producer code:
kafkaTemplate
.send(
MessageBuilder.withPayload(myPayload)
.setHeader(KafkaHeaders.TOPIC, myTopic)
.setHeader("my-key1", "test-key1") // custom keys
.setHeader("my-key2", "test-key2") // custom keys
.build())
Logstash conf
input{
kafka{
//kafka connection details
decorate_events => true
}
}
filter{
mutate => { add_field => { "kafka-topic" => "%{[@metadata][kafka][topic]}" } // this is working
mutate => { add_field => { "custom-key" => "%{[my-key1]}" } // this is not working
}
output{
// store the payload on ES index, _id = my-key1
}
I'm not able to read my custom key. Did I miss anything in the config?
In recent Logstash releases, you can set decorate_events => "extended"
(true means basic) and then get headers using:
%{[@metadata][kafka][headers][my-key1]}