I'm using filebeat and kafka and wanted to replace ingress filebeat timestamp with application timestamp. I saw few example with logstash where can we add filter but not sure with kafka.
In this code I tried to replace timestamp but application_timestamp but its not worked due to date format. but this code worked for message field. filebeat.yml config below
fields:
application_timestamp: "2023-06-07 07:49:51.196Z"
processors:
- timestamp:
field: application_timestamp
layouts:
- '2006-01-02 15:04:05.999Z'
test:
- '2019-11-18 04:59:51.123Z'
processors:
- script:
lang: javascript
id: replace_timestamp
source: >
function process(event) {
event.Put("@timestamp", event.Get("fields.application_timestamp"));
return [event];
}
Updated working filebeat.yml
filebeat.inputs:
- type: log
enabled: true
tags:
- test-kafka
paths:
- /Users/Documents/kafka_testing/logs/test.log
json.keys_under_root: true
json.add_error_key: true
output.kafka:
# specifying filebeat to take timestamp and message fields, other wise it
# take the lines as json and publish to kafka
codec.format:
string: '%{[@timestamp]} %{[message]} %{[application_time]}'
# kafka
# publishing to 'test' topic
hosts: ["localhost:9092"]
topic: 'test'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
processors:
- dissect:
tokenizer: "{type:test,application_time:%{application_time}}"
target_prefix: ""
- timestamp:
field: application_time
layouts:
- '2006-01-02 15:04:05.99'
test:
- '2019-11-18 04:59:51.12'
Result/Output :-
2021-06-25T19:25:30.000Z {type:test,application_time:2021-06-25 19:25:30} 2021-06-25 19:25:30