I have an input file with JSON messages (line delimited). The format of the messages is as below.
{"level":"info","message":{"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}}
{"level":"info","message":{"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}}
{"level":"info","message":{"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}}
I wanted to save the message part of it in the mongo database using fluentd. In mongo, the data in the collection should look like the below.
{
"_id": ObjectId("626a1b813c04335a858e5926"),
"accountId": 99,
"end_dateTime": "",
"id": 0.22837359658442535,
"log": []
}
I wanted to extract and save only the value of the message
key of the input payload.
I have tried using the below config, but it is not working.
<source>
@type tail
@id input_tail2
read_from_head true
path "/opt/webhook-logs/webhook.log"
pos_file "/opt/webhook-logs/webhook.log.pos"
tag "td.mongo.events"
<parse>
@type "json"
unmatched_lines
</parse>
</source>
<match td.mongo.**>
@type mongo
host "127.0.0.1"
port 27017
database "vonnect"
collection "webhooklog"
user "vonnect"
password xxxxxx
buffer_chunk_limit 8m
time_key time
<buffer>
flush_interval 2s
</buffer>
<inject>
time_key time
</inject>
</match>
I have tried record transformer filter too but no success. The filter config looks like
<filter td.mongo.**>
@type record_transformer
renew_record true
# enable_ruby
# auto_typecast true
<record>
${record["message"]}
</record>
</filter>
Here's a working example with record_transformer
and parser
filter plugin:
fluent.conf
<source>
@type sample
@id in_sample
sample [
{"level":"info","message":{"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}},
{"level":"info","message":{"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}},
{"level":"info","message":{"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}}
]
tag sample
</source>
<filter sample>
@type record_transformer
renew_record true
keep_keys message
enable_ruby true
<record>
message ${record["message"].to_json.to_s}
</record>
</filter>
<filter sample>
@type parser
key_name message
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter>
<match sample>
@type stdout
</match>
Run:
fluentd -c fluent.conf
Output:
2022-04-28 14:04:29.094892632 +0500 sample: {"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}
2022-04-28 14:04:30.097973274 +0500 sample: {"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}
2022-04-28 14:04:31.000677835 +0500 sample: {"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}
record_transformer
removes the extra keys and keeps message
with its value converted to a plain string. parser
then removes the message
key and expands its string value as JSON.
References: