jsonapache-kafkafluentdrsyslog

Unable to Parse Kafka server logs format into RSYS


Trying to send Kafka server logs from Rsys to FLuentd. First converted the Kafka server log into Json, and then sending the Json logs from rsys.

VM1: Kafka,Rsyslog installed VM2: FuentD

Example of Kafka server logs.

{"timestamp":"2022-07-29 07:12:18","level":"ERROR","logger":"io.confluent.support.metrics.SupportedKafka","thread":"main","message":"Fatal error during SupportedServerStartable startup. Prepare to shutdown","stacktrace":"java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.\n\tat scala.Predef$.require(Predef.scala:224)\n\tat kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1492)\n\tat kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1460)\n\tat kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1114)\n\tat kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1094)\n\tat kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1091)\n\tat kafka.server.KafkaConfig.fromProps(KafkaConfig.scala)\n\tat io.confluent.support.metrics.SupportedServerStartable.<init>(SupportedServerStartable.java:52)\n\tat io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:45)"}

I've tried the below configuration but it's not giving the right output. rsyslog.conf

$ModLoad imfile
$ModLoad immark
$ModLoad imtcp
$ModLoad imudp
$ModLoad imuxsock

#Template for  logs
template(name="elastic"
  type="list") {
      property(name="hostname")
      constant(value=" ") property(name="syslogtag")
      property(name="msg")
      constant(value="\n")
}


#Provides UDP syslog reception
$UDPServerRun 514

$InputFilePollInterval 1
$InputFileName /var/log/kafka/server.log
$InputFileTag kafkalogs:
$InputFileStateFile kafkalogs
$InputFileFacility local0
$InputRunFileMonitor

:syslogtag, isequal, "kafkalogs:" {
  :msg, contains, "ERROR" {
    local0.* /var/log/kafkalog_error.log
    local0.* @fluentdvmip:5144
  }
  stop
}

td-agent.conf

    <system>
  worker 2
</system>

<source>
  @type syslog
  port 5142
  tag system
</source>

<match system.**>
  @type stdout
</match>

<source>
  @type udp
  port 5144
  tag kafkalogs
  <parse>
  @type json
   time_type string
   time_format %yyyy-%MM-%dd %HH:%mm:%ss
  </parse>
</source>


<match kafka.**>
  @type stdout
</match>

cat /var/log/td-agent/td-agent.log

2022-07-29 07:56:24 +0000 [warn]: #0 pattern not matched data="<133>Jul 29 07:56:24 techsrv01 kafkalogs: {\"timestamp\":\"2022-07-29 07:56:23\",\"level\":\"ERROR\",\"logger\":\"io.confluent.support.metrics.SupportedKafka\",\"thread\":\"main\",\"message\":\"Fatal error during SupportedServerStartable startup. Prepare to shutdown\",\"stacktrace\":\"java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.\\n\\tat scala.Predef$.require(Predef.scala:224)\\n\\tat kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1492)\\n\\tat kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1460)\\n\\tat kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1114)\\n\\tat kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1094)\\n\\tat kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1091)\\n\\tat kafka.server.KafkaConfig.fromProps(KafkaConfig.scala)\\n\\tat io.confluent.support.metrics.SupportedServerStartable.<init>(SupportedServerStartable.java:52)\\n\\tat io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:45)\"}"

As, we can see the td-agent logs is not correct format as compared to kafka logs. How the rsyslog can be made equivalent to actual kafka logs. With every word there is \ added.


Solution

  • Made it work using the below configurations:

    RSYSLOG.conf

    $ModLoad imfile
    $ModLoad immark
    $ModLoad imtcp
    $ModLoad imudp
    $ModLoad imuxsock
    
    #Template for  logs
    template(name="elastic"
      type="list") {
          property(name="hostname")
          constant(value=" ") property(name="syslogtag")
          property(name="msg")
          constant(value="\n")
    }
    #Provides UDP syslog reception
    $UDPServerRun 514
    
    $ModLoad imfile
    $InputFilePollInterval 1
    $InputFileName /var/log/kafka/server.log
    $InputFileTag kafkalogs:
    $InputFileStateFile kafkalogs
    $InputFileFacility local0
    $InputRunFileMonitor
    
    :syslogtag, isequal, "kafkalogs:" {
      :msg, contains, "ERROR" {
        local0.* /var/log/kafkalog_error.log
        local0.* @fluentdvmip:5144
      }
      stop
    }
    

    td-agent.conf

    <system>
      worker 2
    </system>
    
    <source>
      @type syslog
      port 5142
      tag system
    </source>
    
    <match system.**>
      @type stdout
    </match>
    
    <match kafka.**>
      @type stdout
    </match>
    
    <source>
      @type syslog
      port 5144
      bind 0.0.0.0
      tag kafka_error
    </source>
    
    <match kafka_error**>
      @type elasticsearch
      host ElasticVMIP or HOSTNAME
      port 9200
      index_name kafka_error_write
      include_timestamp true
    
    
      # connection configs
        reconnect_on_error true
        reload_on_failure true
        slow_flush_log_threshold 90
    
      # buffer configs
      <buffer>
        @type file
        path /data/opt/fluentd/buffer/kafka_error_write
        chunk_limit_size 32MB
        total_limit_size 20GB
        flush_thread_count 8
        flush_mode interval
        retry_type exponential_backoff
        retry_timeout 10s
        retry_max_interval 30
        overflow_action drop_oldest_chunk
        flush_interval 5s
      </buffer>
    </match>
    

    Result was complete output in Elastic

    enter image description here