I have been trying to transform application log messages into searchable fields in OpenSearch Dashboards(Kibana).
For example, this log entry:
2023-06-20T05:59:59.568967474Z stdout F {"level":"INFO","timestamp":"2023-06-20 05:59:59,%3N","thread":"http-nio-8090-exec-5","file":"AccountService.java","line":"51","message":"A sample info log","trace":""}
Would become this:
"log": {
"level": "INFO",
"timestamp": "2023-06-20 17:36:25,%3N",
"thread": "http-nio-8090-exec-5",
"file": "AccountService.java",
"line": "51",
"message": "A sample info log",
"trace": ""
},
I have tried this ClusterFilter configuration:
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFilter
metadata:
name: filters
labels:
filter.fluentd.fluent.io/enabled: "true"
filter.fluentd.fluent.io/tenant: "core"
spec:
filters:
- customPlugin:
config: |
<filter **>
@type dedot
de_dot_separator _
de_dot_nested ${FLUENTD_DEDOT_NESTED:=true}
</filter>
<filter **>
@type grep
<exclude>
key $.kubernetes.container_name
pattern ^(fluentd|fluent-bit|istio-proxy)$
</exclude>
</filter>
<filter **>
@type parser
key_name log
reserve_data true
remove_key_name_field true
<parse>
@type multi_format
<pattern>
format regexp
expression /^.* (?<log>{.*})$/
time_format %Y-%m-%dT%H:%M:%S.%L%Z
</pattern>
<pattern>
format json
</pattern>
</parse>
</filter>
It only removes the initial string, which is good, but keeps the log message as a single string:
"log": "{\"level\":\"INFO\",\"timestamp\":\"2023-06-20 18:00:45,%3N\",\"thread\":\"http-nio-8090-exec-5\",\"file\":\"AccountService.java\",\"line\":\"51\",\"message\":\"A sample info log\",\"trace\":\"\"}"
How can proper JSON string to JSON object parsing be achieved?
The ideal solution would also be able to handle nested JSON strings in the log message. So, if there is a log entry like this:
"{\"log\":{\"pod_name\":\"json-log-generator-7f6dd7c675-sfbjk\",\"namespace_name\":\"fluent-system\",\"labels\":{\"app\":\"json-log-generator\",\"pod-template-hash\":\"XXXX\",\"security_istio_io/tlsMode\":\"istio\",\"service_istio_io/canonical-name\":\"json-log-generator\",\"service_istio_io/canonical-revision\":\"latest\"},\"annotations\":{\"kubectl_kubernetes_io/default-container\":\"json-log-generator\",\"kubectl_kubernetes_io/default-logs-container\":\"json-log-generator\",\"kubernetes_io/psp\":\"eks.XXX\",\"prometheus_io/path\":\"/XXX/XXX\",\"prometheus_io/port\":\"XXX\",\"prometheus_io/scrape\":\"true\",\"sidecar_istio_io/status\":\"{\\\"XXXX\\\":[\\\"XX-init\\\"],\\\"XX\\\":[\\\"istio-proxy\\\"],\\\"volumes\\\":[\\\"XXX-XXX\\\",\\\"XXX-XX\\\",\\\"XXX-XXX\\\",\\\"istio-envoy\\\",\\\"XX-X\\\",\\\"XX-podinfo\\\",\\\"istio-token\\\",\\\"istiod-ca-cert\\\"],\\\"XXX\\\":XXX,\\\"XXX\\\":\\\"default\\\"}\"}}}"
Then, it would become this:
"log": {
"pod_name": "json-log-generator-7f6dd7c675-sfbjk",
"namespace_name": "fluent-system",
"labels": {
"app": "json-log-generator",
"pod-template-hash": "XXXX",
"security_istio_io/tlsMode": "istio",
"service_istio_io/canonical-name": "json-log-generator",
"service_istio_io/canonical-revision": "latest"
},
"annotations": {
"kubectl_kubernetes_io/default-container": "json-log-generator",
"kubectl_kubernetes_io/default-logs-container": "json-log-generator",
"kubernetes_io/psp": "eks.XXX",
"prometheus_io/path": "/XXX/XXX",
"prometheus_io/port": "XXX",
"prometheus_io/scrape": "true",
"sidecar_istio_io/status": "{\"XXXX\":[\"XX-init\"],\"XX\":[\"istio-proxy\"],\"volumes\":[\"XXX-XXX\",\"XXX-XX\",\"XXX-XXX\",\"istio-envoy\",\"XX-X\",\"XX-podinfo\",\"istio-token\",\"istiod-ca-cert\"],\"XXX\":XXX,\"XXX\":\"default\"}"
}
}
}
Not the ideal solution because it floods Fluentd with this kind of harmless warning log:
2023-06-22 02:43:20 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::Parser::ParserError error="pattern not matched with data '2023-06-22T02:43:20.213055109Z stderr F I0622 02:43:20.212880 1 reflector.go:536] k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes/listers.go:347: Watch close - *v1.ReplicaSet total 8 items received'" location=nil tag="kube.var.log.containers.cluster-autoscaler-aws-cluster-autoscaler-6d8dc5669d-gbt2k_kube-system_aws-cluster-autoscaler-43ac14d2df7cf4d3cb5326d0c15a068200ef2cfece9ed96714bafca5c29ca50f.log" time=2023-06-22 02:43:20.213162944 +0000 record={"log"=>"2023-06-22T02:43:20.213055109Z stderr F I0622 02:43:20.212880 1 reflector.go:536] k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes/listers.go:347: Watch close - *v1.ReplicaSet total 8 items received", "kubernetes"=>{"pod_name"=>"cluster-autoscaler-aws-cluster-autoscaler-6d8dc5669d-gbt2k", "namespace_name"=>"kube-system", "labels"=>{"app_kubernetes_io/instance"=>"cluster-autoscaler", "app_kubernetes_io/name"=>"aws-cluster-autoscaler", "pod-template-hash"=>"6d8dc5669d"}, "annotations"=>{"kubernetes_io/psp"=>"eks.privileged"}, "container_name"=>"aws-cluster-autoscaler", "docker_id"=>"43ac14d2df7cf4d3cb5326d0c15a068200ef2cfece9ed96714bafca5c29ca50f", "container_image"=>"registry.k8s.io/autoscaling/cluster-autoscaler:v1.24.0"}}
But I managed to turn log messages into searchable fields using this configuration:
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFilter
metadata:
name: cluster-filters
labels:
filter.fluentd.fluent.io/enabled: "true"
filter.fluentd.fluent.io/tenant: "raas-core"
spec:
filters:
- customPlugin:
config: |
<filter **>
@type dedot
de_dot_separator _
de_dot_nested ${FLUENTD_DEDOT_NESTED:=true}
</filter>
<filter kube.var.log.containers.**>
@type grep
<exclude>
key $.kubernetes.container_name
pattern ^(fluent-bit|fluentd)$
</exclude>
</filter>
<filter kube.var.log.containers.**>
@type parser
key_name log
reserve_data true
<parse>
@type regexp
expression ^(?:[^\{]*)(?<log>\{.*\})$
</parse>
</filter>
<filter kube.var.log.containers.**>
@type parser
key_name log
reserve_data true
remove_key_name_field false
hash_value_field app_log
<parse>
@type multi_format
<pattern>
format json
</pattern>
<pattern>
format none
</pattern>
</parse>
</filter>
The first filter is a 'dedot' type and replaces dots in field names with underscores. A necessary step for OpenSearch as discussed here: https://github.com/fluent/fluent-operator/issues/702 It also supports nested field replacement, controlled by the FLUENTD_DEDOT_NESTED environment variable.
The second filter is a 'grep' type and excludes logs from the containers placed in the pattern.
The third 'parser' filter parses logs from the Kubernetes containers. It uses a regular expression to extract just the JSON formatted part of logs from the 'log' field, eliminating any characters before it. The 'reserve_data' option is set to true, meaning that if the parsing fails, the original data will be kept.
The last 'parser' filter also deals with the logs from the Kubernetes containers.
The key difference here is that it attempts to parse the logs using multiple formats (JSON and none), providing flexibility for different logging formats. It takes the JSON string from the previous filter and attempts to parse it as a JSON object.
The 'hash_value_field' option puts parsed result into the 'app_log' field. For example, app_log.level, app_log.message, etc. It also reserves original data if parsing fails due to the 'reserve_data' option.
The patterns should be read as if-else statements. If the first pattern fails, the second one will be used, and so on.
The 'format none' pattern is used as a fallback to prevent log loss and the parser from failing if the log is not in JSON format. More patterns can be added if needed. The possible formats are: regexp, apache2, apache_error, nginx, syslog, json, multiline, csv, tsv, ltsv, msgpack, and none.
We can, for example, create another filter for server_logs and use the 'nginx' or 'apache2' format to parse them. More information about Fluentd parsers can be found here: https://docs.fluentd.org/parser