Trying to send Kafka server logs from Rsys to FLuentd. First converted the Kafka server log into Json, and then sending the Json logs from rsys.
VM1: Kafka,Rsyslog installed VM2: FuentD
Example of Kafka server logs.
{"timestamp":"2022-07-29 07:12:18","level":"ERROR","logger":"io.confluent.support.metrics.SupportedKafka","thread":"main","message":"Fatal error during SupportedServerStartable startup. Prepare to shutdown","stacktrace":"java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.\n\tat scala.Predef$.require(Predef.scala:224)\n\tat kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1492)\n\tat kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1460)\n\tat kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1114)\n\tat kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1094)\n\tat kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1091)\n\tat kafka.server.KafkaConfig.fromProps(KafkaConfig.scala)\n\tat io.confluent.support.metrics.SupportedServerStartable.<init>(SupportedServerStartable.java:52)\n\tat io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:45)"}
I've tried the below configuration but it's not giving the right output. rsyslog.conf
$ModLoad imfile
$ModLoad immark
$ModLoad imtcp
$ModLoad imudp
$ModLoad imuxsock
#Template for logs
template(name="elastic"
type="list") {
property(name="hostname")
constant(value=" ") property(name="syslogtag")
property(name="msg")
constant(value="\n")
}
#Provides UDP syslog reception
$UDPServerRun 514
$InputFilePollInterval 1
$InputFileName /var/log/kafka/server.log
$InputFileTag kafkalogs:
$InputFileStateFile kafkalogs
$InputFileFacility local0
$InputRunFileMonitor
:syslogtag, isequal, "kafkalogs:" {
:msg, contains, "ERROR" {
local0.* /var/log/kafkalog_error.log
local0.* @fluentdvmip:5144
}
stop
}
td-agent.conf
<system>
worker 2
</system>
<source>
@type syslog
port 5142
tag system
</source>
<match system.**>
@type stdout
</match>
<source>
@type udp
port 5144
tag kafkalogs
<parse>
@type json
time_type string
time_format %yyyy-%MM-%dd %HH:%mm:%ss
</parse>
</source>
<match kafka.**>
@type stdout
</match>
cat /var/log/td-agent/td-agent.log
2022-07-29 07:56:24 +0000 [warn]: #0 pattern not matched data="<133>Jul 29 07:56:24 techsrv01 kafkalogs: {\"timestamp\":\"2022-07-29 07:56:23\",\"level\":\"ERROR\",\"logger\":\"io.confluent.support.metrics.SupportedKafka\",\"thread\":\"main\",\"message\":\"Fatal error during SupportedServerStartable startup. Prepare to shutdown\",\"stacktrace\":\"java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.\\n\\tat scala.Predef$.require(Predef.scala:224)\\n\\tat kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1492)\\n\\tat kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1460)\\n\\tat kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1114)\\n\\tat kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1094)\\n\\tat kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1091)\\n\\tat kafka.server.KafkaConfig.fromProps(KafkaConfig.scala)\\n\\tat io.confluent.support.metrics.SupportedServerStartable.<init>(SupportedServerStartable.java:52)\\n\\tat io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:45)\"}"
As, we can see the td-agent logs is not correct format as compared to kafka logs. How the rsyslog can be made equivalent to actual kafka logs. With every word there is \ added.
Made it work using the below configurations:
RSYSLOG.conf
$ModLoad imfile
$ModLoad immark
$ModLoad imtcp
$ModLoad imudp
$ModLoad imuxsock
#Template for logs
template(name="elastic"
type="list") {
property(name="hostname")
constant(value=" ") property(name="syslogtag")
property(name="msg")
constant(value="\n")
}
#Provides UDP syslog reception
$UDPServerRun 514
$ModLoad imfile
$InputFilePollInterval 1
$InputFileName /var/log/kafka/server.log
$InputFileTag kafkalogs:
$InputFileStateFile kafkalogs
$InputFileFacility local0
$InputRunFileMonitor
:syslogtag, isequal, "kafkalogs:" {
:msg, contains, "ERROR" {
local0.* /var/log/kafkalog_error.log
local0.* @fluentdvmip:5144
}
stop
}
td-agent.conf
<system>
worker 2
</system>
<source>
@type syslog
port 5142
tag system
</source>
<match system.**>
@type stdout
</match>
<match kafka.**>
@type stdout
</match>
<source>
@type syslog
port 5144
bind 0.0.0.0
tag kafka_error
</source>
<match kafka_error**>
@type elasticsearch
host ElasticVMIP or HOSTNAME
port 9200
index_name kafka_error_write
include_timestamp true
# connection configs
reconnect_on_error true
reload_on_failure true
slow_flush_log_threshold 90
# buffer configs
<buffer>
@type file
path /data/opt/fluentd/buffer/kafka_error_write
chunk_limit_size 32MB
total_limit_size 20GB
flush_thread_count 8
flush_mode interval
retry_type exponential_backoff
retry_timeout 10s
retry_max_interval 30
overflow_action drop_oldest_chunk
flush_interval 5s
</buffer>
</match>
Result was complete output in Elastic