logstashlogstash-groklogstash-configurationlogstash-filelogstash-forwarder

logstash GROK filter along with KV plugin couldn't able to process the events


i am new to ELK. when i onboarded the below log file, it is going to "dead letter queue" in logstash because logstash couldn't able to process the events.I have written the GROK filter to parse the events but logstash still couldn't not process the events. Any help would be appreciated.

Below is the sample log format.

25193662345 [http-nio-8080-exec-44] DEBUG c.s.b.a.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=31, totalTime=33 tenantId=b9sdfs-1033-4444-aba5-csdfsdfsf, immutableBlobId=bss_c_586331/Sample_app12-sdas-157123148464.txt, blobSize=2862, domain=abc

2519366789 [http-nio-8080-exec-47] DEBUG q.s.b.y.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde

GROK filter:

dissect { mapping => { "message" => "%{NUMBER:number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" } }
    kv { source => "[@metadata][msg]" field_split => "," }

Thanks


Solution

  • You have basically two problems in your configuration.

    1.) You are using the dissect filter, not grok, both are used to parse messages, but grok uses regular expressions to validate the value of the field and dissect is just positional, it does not perform any validation, if you have a WORD value in the position of a field that expects a NUMBER, grok will fail, but dissect will not.

    If your log lines always have the same pattern, you should continue to use dissect since it is faster and needs less cpu.

    Your correct dissect mapping should be:

    dissect {
        mapping => { "message" => "%{number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" }
    }
    

    2.) The field that contains the kv message is wrong, it has fields separated by space and by comma, kv won't work this way.

    After your dissect filter this is the content of [@metadata][msg].

    method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde
    

    To solve this you should use a mutate filter to remove the comma from the [@metadata][msg] and use the kv filter with the default configurations.

    This should be your filter configuration

    filter {
        dissect {
            mapping => { "message" => "%{number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" }
        }
        mutate {
            gsub => ["[@metadata][msg]",",",""]
        }
        kv {
            source => "[@metadata][msg]"
        }
    }
    

    Your output should be something like this:

    {
                 "number" => "2519366789",
             "@timestamp" => 2019-11-03T16:42:11.708Z,
                 "thread" => "http-nio-8080-exec-47",
           "appLogicTime" => "1",
                 "domain" => "cde",
                 "method" => "PUT",
                  "level" => "DEBUG",
               "blobSize" => "2862",
               "@version" => "1",
        "immutableBlobId" => "bss_c_586334/Sample_app15-615223-157sadas6648465.txt",
           "streamInTime" => "0",
                 "status" => "201",
        "blobStorageTime" => "32",
                "message" => "2519366789 [http-nio-8080-exec-47] DEBUG q.s.b.y.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde",
              "totalTime" => "33",
               "tenantId" => "b0csdfsd-1066-4444-adf4-ce7bsdfssdf",
                  "class" => "q.s.b.y.m.PerformanceMetricsFilter"
    }