jsonapache-kafkaapache-nifiprocessors

NiFi, flow with KafkaConsumer to write as json


currently I am stuck on the following problem: I am reading messages from a Kafka Topic using KafkaConsumer. The messages are strings and have the following format: { "a" : "b", "a1" : "b1", "c2" : "c3" } They are saved within the payload of the FlowFile.

I want to convert that string into json or ideally to csv, but cant figure out how to do it.

I am new to NiFi and researched as much as possible, but the answers I found were regarding conversions from json to avro or similar, but never string to json or avro. I also found out that the Kafka message is in the payload of the FlowFile, not in the attributes, so I have no clue how to get my hands on it, since the examples are always involving the attributes.

So in short: Can I convert the payload of a FlowFile, which is a string, to json/cvs with some built-in processor.


Solution

  • if your message is in FlowFile, the following sequence could help:

    1) Use AttributesToJson to convert payload message to Json. 2) Use EvaluateJsonPath to extract the payload message. In your case the kafka message. Then you can pass the extracted messages for csv generation.

    This post can help to convert Json TO CSV: Convert Json To CSV