elasticsearchlogstashopensearchansible-toweransible-automation-platform

logstash keep both raw data and aggregated data


I have a service that is capable of sending event logs to a 3rd party which in this case is logstash. It cannot send the data more than once. My issue is that the events it sends are job events data per line and we are aggregating the data per job_id so we can peruse the documents as a job per document. This is because some jobs are thousands of lines and from a readability and search perspective it's not user-friendly to have 1000s of documents in the Kibana when simply trying to view one job. Hence the aggregation to fuse all documents of one job into one document. Aggregation seems to be a destructive process and once applied we lose the original events. We would like for a variety of reasons to also keep ALL the data as the raw version without any aggregation sent to a different index. Is there a way to do this?

If there is a way to aggregate in elastic/opensearch that would suffice too.

Any ideas?

config currently is like:

input {
  tcp {
    port: 4512
    codec: json
  }
} 
filter {
  mutate {
    add_field => {
      "platform" => "test"
    }
    add_field => {
      "env" => "dev"
    }
    add_field => {
      "job_id" => "%{uuid}"
    }
  }
  aggregate {
   timeout_timestamp_filed => "@timestamp"
   task_id  => "%{job_id}"
   code => "
   map['@timestamp'] || event.get('@timestamp')
   map['platform'] || event.get('platform')
   map['env'] || event.get('env')
   map['jobID'] || event.get('job_id')
   map['message'] || event.get('message')
   event.cancel()
   "
   push_map_as_event_on_timeout => true
   timeout => 300
   inactivity_timeout => 60
   timeout_task_id_field => "job_id"
   timeout_tags =
  }
}
output {
 opensearch {
   hosts => blah
   index => events_index
   action => index
   document_id => job_id
 }
}

Solution

  • If you want to keep the raw events then remove the event.cancel() call from the aggregate filter. If you want to route aggregated and raw events to different destinations then use timeout_tags on the aggregate filter, with a conditional in the output section to decide which output to send the event to.