logstashamazon-sqsaudit-loggingaudit-trail

Logstash alternative to receive messages from AWS SQS and batch store in AWS S3


I need the ability to store logs as batches in AWS S3 as text files formatted appropriately for JSON-SerDe.

Example of how one of the batched log files would look on S3, quite important that the datetime format is yyyy-MM-dd HH:mm:ss

{"message":"Message number 1","datetime":"2020-12-01 14:37:00"}
{"message":"Message number 2","datetime":"2020-12-01 14:38:00"}
{"message":"Message number 3","datetime":"2020-12-01 14:39:00"}

Ideally these would be stored on S3 every 5 seconds or when queued messages hit 50 but also be configurable.


I've almost managed to get this working with Logstash using the sqs input plugin and the s3 output plugin using the below config

input {
  sqs {
    endpoint => "AWS_SQS_ENDPOINT"
    queue => "logs"
  }
}

output {
   s3 {
     access_key_id => "AWS_ACCESS_KEY_ID"
     secret_access_key => "AWS_SECRET_ACCESS_KEY"
     region => "AWS_REGION"
     bucket => "AWS_BUCKET"
     prefix => "audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/"
     size_file => 128
     time_file => 5
     codec => "json_lines"
     encoding => "gzip"
     canned_acl => "private"
   }
}

The problem is the S3 output plugin requires the @timestamp field which isn't compatible with our query tool. If you use the mutate filter to remove @timestamp or change to datetime then it will not process the logs. We can't store the datetime field and @timestamp for every record as that drastically increases the amount of data we need to store (millions of logs).

Are there any other software alternatives for achieving this result?


Updated config which is working with Logstash thanks to [Badger][https://stackoverflow.com/users/11792977/badger]

input {
  sqs {
    endpoint => "http://AWS_SQS_ENDPOINT"
    queue => "logs"
  }
}

filter {
  mutate {
    add_field => {
      "[@metadata][year]" => "%{+YYYY}"
      "[@metadata][month]" => "%{+MM}"
      "[@metadata][day]" => "%{+dd}"
    }
    remove_field => [ "@timestamp" ]
  }
}

output {
   s3 {
     access_key_id => "AWS_ACCESS_KEY_ID"
     secret_access_key => "AWS_SECRET_ACCESS_KEY"
     region => "AWS_REGION"
     bucket => "AWS_BUCKET"
     prefix => "audit/year=%{[@metadata][year]}/month=%{[@metadata][month]}/day=%{[@metadata][day]}"
     # 1 MB
     size_file => 1024
     # 1 Minute
     time_file => 1
     codec => "json_lines"
     encoding => "gzip"
     canned_acl => "private"
   }
}

Solution

  • I do not see any dependency on @timestamp in the s3 output code. You have created one by using a sprintf reference to it in prefix => "audit/year=%{+YYYY}/month=%{+MM}/day=%{+dd}/". You can move those sprintf references to a mutate+add_field filter which adds fields to [@metadata], then remove @timestamp, then reference the [@metadata] fields in the prefix option.