My use case is like this. I have some X tables to be pulled from MySQL. I am splitting them using SplitText
to put each table in a individual flow file and pull using GenerateTableFetch
and ExecuteSQL
.
And I want to be notified or put some other action when import is done for all the tables. At SplitText
text processor I have routed original
relationship to Wait
on ${filename}
with target count ${fragment.count}
. This will track how many tables are done.
But now I am not able to figure out how to know when a particular table is done. GenerateTableFetch
forks flow file into multiple based on Partition Size. But it does not write attributes like fragment.count which I can use to wait on for each table.
Is there a way I can achieve this? Or maybe is there a way to know at the end of the entire flow if all flow files in the flow have been processed and nothing is in queue or being processed?
Till NiFi add's support for this, I managed to make it work using MergeContent
. Use table_name as Correlation attribute name
and then use merged
relation to Wait
processor using ${merge.count}
as target. Refer screenshots if someone is looking to do the same.