attributesapache-nificlouderadata-ingestion

NiFi JSON data using RouteOnAttribute to filter attribute or attribute value


I am currently working to consume data using Nifi to read the Tealium event stream and load it into HDFS. Need help in filtering the data when the source misses sending data for the attribute.

{"account":"newtv","twitter:description":"Discover when your favorite NewTV shows and hosts are being shown. ","og:locale":"en_US","dcterms:publisher":"NewTV","original-source":"www.newtv.com/","og:url":"www.newtv.com/show/program-guide"}},"post_time":"2019-10-09 11:27:46","useragent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36","event_id":"12345"}

Sample above message. I am currently stuck with filtering the data when the source misses sending data for the event_id attribute from the following sample dataset.

Current Nifi flow:

Consume Kafka -> Evaluate Json Path -> Jolttransform Json -> Evaluate Json Path -> RouteOnAttribute -> Merge Content -> Evaluate Json Path -> Update attribute -> PutHDFS -> MoveHDFS

I need help splitting data using RouteOnAttribute to differentiate missing event_id attribute or attribute_value into two different flows. Flow with attribute or attribute value and missing values to error, and load into a different output path.


Solution

  • In EvaluateJsonPath processor add new property to extract event_id value from the flowfile.

    if flowfile is not having event_id then nifi adds empty value to the attribute.

    EvaluateJsonPath Configs: enter image description here

    Then by using RouteOnAttribute processor we can check the attribute value and route the flowfile accordingly.

    RouteOnAttribute Configs:

    not null value

    ${event_id:isEmpty():not()}
    

    null value

    ${event_id:isEmpty()}
    

    enter image description here

    Then use null value and not null value relationships for further processing..!!