I'm new to SysFlow, and I want to send the events to Splunk.
The problem is that the events arrive at Splunk with a metadata prefix so Splunk doesn't know how to interpret the event as JSON automatically.
Here is the raw event:
Mar 9 21:57:26 10.0.0.158 1 2023-03-09T23:57:26+02:00 RHEL-Server /usr/bin/sfprocessor 60594 sysflow - {"version":5,"endts":0,"opflags":["EXEC"],"ret":0,"ts":1678399046064475870,"type":"PE","meta":{"schema":4,"tracename":"."},"node":{"id":"RHEL-Server","ip":"10.0.0.158"},"pproc":{"args":"--switched-root --system --deserialize 30","cmdline":"/usr/lib/systemd/systemd --switched-root --system --deserialize 30","createts":1678395155396715155,"entry":true,"exe":"/usr/lib/systemd/systemd","gid":0,"group":"root","name":"systemd","oid":"d2e72642fc1d9df2","pid":1,"tty":false,"uid":0,"user":"root"},"proc":{"acmdline":["/usr/lib/systemd/systemd --switched-root --system --deserialize 30","/usr/lib/systemd/systemd --switched-root --system --deserialize 30"],"aexe":["/usr/lib/systemd/systemd","/usr/lib/systemd/systemd"],"aname":["systemd","systemd"],"apid":[73232,1],"args":"/usr/bin/dnf makecache --timer","cmdline":"/usr/bin/dnf /usr/bin/dnf makecache --timer","createts":1678399046062638027,"entry":false,"exe":"/usr/bin/dnf","gid":0,"group":"root","name":"dnf","oid":"6a55582e3bbddc8a","pid":73232,"tid":73232,"tty":false,"uid":0,"user":"root"},"policies":[{"id":"Unauthorized installer detected","desc":"Use of package installer detected in container","priority":1},{"id":"Unauthorized installer detected","desc":"Use of package installer detected in container","priority":1}],"tags":["actionable-offense","suspicious-process","mitre:T1072"]}
Here is my configuration (/etc/sysflow/pipelines/pipeline.local.json
):
{
"pipeline":[
{
"processor": "sysflowreader",
"handler": "flattener",
"in": "sysflow sysflowchan",
"out": "flat flattenerchan"
},
{
"processor": "policyengine",
"in": "flat flattenerchan",
"out": "evt eventchan",
"policies": "/etc/sysflow/policies/runtimeintegrity",
"mode": "alert"
},
{
"processor": "exporter",
"in": "evt eventchan",
"export": "syslog",
"format": "json",
"syslog.proto": "udp",
"syslog.tag": "sysflow",
"syslog.host": "splunk-server",
"syslog.port": "514"
}
]
}
I read SysFlow documentation about configuration (here)
I know how to parse the event with SPL, I just need to build an add-on that will that for me (I need the data in data models), here is the SPL:
index="sysflow" sourcetype="sysflow:syslog"
| rex field=_raw "^(?:[^ \n]* ){7}(?P<json>.+)"
| spath input=json
Not the most elegant solution... but it works
I created a Field Extraction in props.conf
:
EXTRACT-json = ^(?:[^ \n]* ){7}(?P<json>.+)
And then parsed personally each field using the spath()
function in eval
, for example:
EVAL-file_path = spath(json, "file.path")
EVAL-parent_process = spath(json, "pproc.cmdline")
EVAL-process = spath(json, "proc.cmdline")
EVAL-user = spath(json, "proc.user")
EVAL-type = spath(json, "type")
EVAL-file_name = spath(json, "file.name")
EVAL-process_name = spath(json, "proc.name")
EVAL-parent_process_name = spath(json, "pproc.name")
EVAL-parent_process_path = spath(json, "pproc.exe")
EVAL-dest = spath(json, "node.id")
EVAL-signature = spath(json, "policies{}.id")
EVAL-file_acl = spath(json, "file.openflags{}")
EVAL-action = spath(json, "opflags{}")
I'm working now on an Add-On for splunkbase (CIM compliant)