jsonapache-nifijolthortonworks-dataflow

JSON Transformation with data manipulation using Apache NiFi


I want to do some basic transformation to my below sample JSON, I want to change the value of the timeStamp tag to date format and want to add a new tag created_ts with value of current_timestamp to the my expected JSON output using NiFi.

Sample JSON:

{"name": "SAMPLE_NAME","timeStamp": "1477307252000","value": "-0.06279052","quality": "1090"}

Expected JSON:

{"name": "SAMPLE_NAME","timeStamp": "2016-11-08 14:46:13.674","value": "-0.06279052","quality": "1090","created_ts":"2016-11-08 14:46:13.674"}

Can you please help with the detail steps to follow in Apache NiFi/HDF.


Solution

  • data transformation not implemented.

    check the official doc:

    https://github.com/bazaarvoice/jolt#stock-transforms

    Stock Transforms

    The Stock transforms are:

    shift       : copy data from the input tree and put it the output tree
    default     : apply default values to the tree
    remove      : remove data from the tree
    sort        : sort the Map key values alphabetically ( for debugging and human readability )
    cardinality : "fix" the cardinality of input data.  Eg, the "urls" element is usually a List, 
                        but if there is only one, then it is a String
    

    Currently, all the Stock transforms just effect the "structure" of the data.

    To do data manipulation, you will need to write Java code.

    If you write your Java "data manipulation" code to implement the Transform interface, then you can insert your code in the transform chain.


    So, to complete your task I see two main variants:


    V1:

    Use the sequence of following processors:

    EvaluateJsonPath -> UpdateAttributes -> AttributesToJSON

    in EvaluateJsonPath define for each field attributes with expressions like $.name, $.timeStamp, ...

    in UpdateAttributes convert the format of timeStamp and define new attributes:

    attribute  |   value/expression
    -----------------------------------------------------------
    timeStamp  |   timeStamp:format('yyyy-MM-dd HH:mm:ss.SSS')
    created_ts |   now():format('yyyy-MM-dd HH:mm:ss.SSS')
    

    in AttributesToJSON define Attributes List to be stored as json object into file content


    V2: use ExecuteScript processor with following code:

    import groovy.json.JsonSlurper
    import groovy.json.JsonBuilder
    
    def ff = session.get()
    if(!ff)return
    ff = session.write(ff, {rawIn, rawOut->
        // transform streams into reader and writer
        rawIn.withReader("UTF-8"){reader->
            rawOut.withWriter("UTF-8"){writer->
                //parse reader into Map
                def json = new JsonSlurper().parse(reader)
                //change/set values
                json.timeStamp = new Date(json.timeStamp as Long).format('yyyy-MM-dd HH:mm:ss.SSS')
                json.created_ts = new Date().format('yyyy-MM-dd HH:mm:ss.SSS')
                //write changed object to writer
                new JsonBuilder(json).writeTo(writer)
            }
        }
    } as StreamCallback)
    session.transfer(ff, REL_SUCCESS)