apache-kafkalogstashlogstash-configurationlogstash-file

How to get custom key value in the logstash kafka input?


In my producer, I've added a few custom keys in the headers. Using logstash i need to get the payload in addition I need to read the value of my keys.

Producer code:

      kafkaTemplate
          .send(
              MessageBuilder.withPayload(myPayload)
                  .setHeader(KafkaHeaders.TOPIC, myTopic)
                  .setHeader("my-key1", "test-key1") // custom keys
                  .setHeader("my-key2", "test-key2") // custom keys
                  .build())

Logstash conf

input{
   kafka{
         //kafka connection details

        decorate_events => true

   }
}

filter{
   mutate => { add_field => { "kafka-topic" => "%{[@metadata][kafka][topic]}" }  // this is working
   mutate => { add_field => { "custom-key" => "%{[my-key1]}" } // this is not working 

}

output{
  // store the payload on ES index, _id = my-key1
}

I'm not able to read my custom key. Did I miss anything in the config?


Solution

  • In recent Logstash releases, you can set decorate_events => "extended" (true means basic) and then get headers using:

    %{[@metadata][kafka][headers][my-key1]}