I want to do some basic transformation to my below sample JSON, I want to change the value of the timeStamp tag to date format and want to add a new tag created_ts
with value of current_timestamp
to the my expected JSON output using NiFi.
Sample JSON:
{"name": "SAMPLE_NAME","timeStamp": "1477307252000","value": "-0.06279052","quality": "1090"}
Expected JSON:
{"name": "SAMPLE_NAME","timeStamp": "2016-11-08 14:46:13.674","value": "-0.06279052","quality": "1090","created_ts":"2016-11-08 14:46:13.674"}
Can you please help with the detail steps to follow in Apache NiFi/HDF.
data transformation not implemented.
check the official doc:
https://github.com/bazaarvoice/jolt#stock-transforms
Stock Transforms
The Stock transforms are:
shift : copy data from the input tree and put it the output tree
default : apply default values to the tree
remove : remove data from the tree
sort : sort the Map key values alphabetically ( for debugging and human readability )
cardinality : "fix" the cardinality of input data. Eg, the "urls" element is usually a List,
but if there is only one, then it is a String
Currently, all the Stock transforms just effect the "structure" of the data.
To do data manipulation, you will need to write Java code.
If you write your Java "data manipulation" code to implement the Transform interface, then you can insert your code in the transform chain.
So, to complete your task I see two main variants:
V1:
Use the sequence of following processors:
EvaluateJsonPath -> UpdateAttributes -> AttributesToJSON
in EvaluateJsonPath
define for each field attributes with expressions like $.name
, $.timeStamp
, ...
in UpdateAttributes
convert the format of timeStamp
and define new attributes:
attribute | value/expression
-----------------------------------------------------------
timeStamp | timeStamp:format('yyyy-MM-dd HH:mm:ss.SSS')
created_ts | now():format('yyyy-MM-dd HH:mm:ss.SSS')
in AttributesToJSON
define Attributes List
to be stored as json object into file content
V2: use ExecuteScript
processor with following code:
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
// transform streams into reader and writer
rawIn.withReader("UTF-8"){reader->
rawOut.withWriter("UTF-8"){writer->
//parse reader into Map
def json = new JsonSlurper().parse(reader)
//change/set values
json.timeStamp = new Date(json.timeStamp as Long).format('yyyy-MM-dd HH:mm:ss.SSS')
json.created_ts = new Date().format('yyyy-MM-dd HH:mm:ss.SSS')
//write changed object to writer
new JsonBuilder(json).writeTo(writer)
}
}
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)