kqlazure-sentinel

Trying to parse non-uniform JSON arrays with KQL in Sentinel


I have Sentinel alerts that have a column for entities. All the data in the entity column are formatted in JSON but the way each field is laid out means that fields are in different places. It means that in order to get the data I want I have to use a kludge query and hope it catches the fields I need. For example, the sender data might be in the first field, and the receiver data in another field, but be completely different for the next alert.

Example JSON data of one alert:

[{
        "$id": "2",
        "MailboxPrimaryAddress": "user.one@org.com",
        "DisplayName": "User One",
        "Upn": "user.one@org.com",
        "AadId": "redacted",
        "CreatedTimeUtc": "2024-02-21T15:10:24.6466667",
        "ThreatAnalysisSummary": [{
                "AnalyzersResult": [],
                "Verdict": "Unknown",
                "AnalysisDate": "2024-02-21T15:10:24.6466667"
            }
        ],
        "LastVerdict": "Unknown",
        "Type": "mailbox"
    }, {
        "$id": "3",
        "MailboxPrimaryAddress": "user.two@org.com",
        "DisplayName": "User Two",
        "Upn": "user.two@org.com",
        "AadId": "redacted",
        "CreatedTimeUtc": "2024-02-21T15:10:24.6466667",
        "ThreatAnalysisSummary": [{
                "AnalyzersResult": [],
                "Verdict": "Unknown",
                "AnalysisDate": "2024-02-21T15:10:24.6466667"
            }
        ],
        "LastVerdict": "Unknown",
        "Type": "mailbox"
    }, {
        "$id": "4",
        "Recipient": "user.one@org.com",
        "Sender": "sender@domain.com",
        "P1Sender": "",
        "P1SenderDisplayName": "",
        "P1SenderDomain": "",
        "SenderIP": "redacted",
        "P2Sender": "sender@domain.com",
        "P2SenderDisplayName": "",
        "P2SenderDomain": "",
        "ReceivedDate": "2024-02-21T15:08:29",
        "NetworkMessageId": "redacted",
        "InternetMessageId": "redacted",
        "Subject": "redacted",
        "AntispamDirection": "Unknown",
        "DeliveryAction": "Unknown",
        "DeliveryLocation": "Unknown",
        "CreatedTimeUtc": "2024-02-21T15:10:24.6466667",
        "ThreatAnalysisSummary": [{
                "AnalyzersResult": [],
                "Verdict": "Unknown",
                "AnalysisDate": "2024-02-21T15:10:24.6466667"
            }
        ],
        "LastVerdict": "Unknown",
        "Type": "mailMessage"
    }
]

The issue is that a different alert will have the fields mixed up so a static parse does not pull all the data.

This is the KQL I'm currently using. It gets me the columns I'm after, but I feel like there's a better way to do this. mv-expand and parse_json() seem to expect uniform structure of all the JSON fields so lots of the results end up getting missed.

SecurityAlert
| where AlertName startswith "Phish"
| extend sendIP = coalesce(
tostring(parse_json(Entities)[1].SenderIP), 
tostring(parse_json(Entities)[2].SenderIP), 
tostring(parse_json(Entities)[3].SenderIP), 
tostring(parse_json(Entities)[4].SenderIP),
tostring(parse_json(Entities)[5].SenderIP),
tostring(parse_json(Entities)[6].SenderIP),
tostring(parse_json(Entities)[7].SenderIP)
)
| extend P1Sender = coalesce(
tostring(parse_json(Entities)[1].Sender),
tostring(parse_json(Entities)[2].Sender),
tostring(parse_json(Entities)[3].Sender),
tostring(parse_json(Entities)[4].Sender),
tostring(parse_json(Entities)[5].Sender),
tostring(parse_json(Entities)[6].Sender),
tostring(parse_json(Entities)[7].Sender)
)
| extend NetworkMessageID = coalesce(
tostring(parse_json(Entities)[1].NetworkMessageId),
tostring(parse_json(Entities)[2].NetworkMessageId),
tostring(parse_json(Entities)[3].NetworkMessageId),
tostring(parse_json(Entities)[4].NetworkMessageId),
tostring(parse_json(Entities)[5].NetworkMessageId),
tostring(parse_json(Entities)[6].NetworkMessageId)
)
| extend Subject = coalesce(
tostring(parse_json(Entities)[1].Subject),
tostring(parse_json(Entities)[2].Subject),
tostring(parse_json(Entities)[3].Subject),
tostring(parse_json(Entities)[4].Subject),
tostring(parse_json(Entities)[5].Subject),
tostring(parse_json(Entities)[6].Subject)
)

Thanks to @Aswin for the tip with mv-apply, this is what I used:

SecurityAlert
| where AlertName startswith "Phish"
| mv-apply todynamic(Entities) on (
    extend entity = parse_json(Entities)
    | project sendIP = entity.SenderIP, P1Sender = entity.Sender, NetworkMessageID = entity.NetworkMessageId, Subject = entity.Subject
)
| where isnotempty(sendIP)
| summarize count() by tostring(sendIP), tostring(P1Sender), tostring(NetworkMessageID), tostring(Subject)

Solution

  • You can use the mv-apply operator to extract the fields you need. The mv-apply operator applies a subquery to each element of an array, which can be useful for extracting fields from JSON data. Below is the query that uses mv-apply to extract the SenderIP, Sender, NetworkMessageId and Subject fields from the JSON data.

    SecurityAlert
    | where AlertName startswith "Phish"
    | mv-apply Entities on (
        extend entity = parse_json(Entities)
        | project sendIP = entity.SenderIP, P1Sender = entity.Sender, NetworkMessageID = entity.NetworkMessageId, Subject = entity.Subject
    )
    | extend a=1
    | summarize sendIP=minif(sendIP,isnotempty(sendIP)),
    P1Sender=minif(P1Sender,isnotempty(P1Sender)),
    NetworkMessageID=minif(NetworkMessageID,isnotempty(NetworkMessageID)),
    Subject=minif(Subject,isnotempty(Subject)) by a
    | project-away a
    

    In this query, we first use parse_json() to convert the Entities column to a JSON object. We then use mv-apply to apply a subquery to each element of the JSON array. In the subquery, we use extend to create new columns for the fields we want to extract, and then project to select only those columns. After extending the Json using mv-apply, to take the first not null value for each column, we summarize the data with aggregation function. Here minif function is used to take not -null value of each column.

    demo-query