I'm using NiFi to orchestrate the processing of large binary files using a proprietary processing tool (which runs external to NiFi).
NiFi drops the source files on disk, I call the external tool (using an ExecuteScript processor), the tool loads the binary file and proceeds to generate a lot of smaller files.
When the external tool is completely finished, I need to "pick up" the directory of smaller (generated) files and continue to process via NiFi. I need to wait because the [output directory], [number of files], and [time required to process] are dynamic.
The problem:
... so what processor(s) can I use to, upon completion of the binary processing, grab the directory of new files and bring them into NiFi land?
Somewhat in-line with @Bryan Bende's answer, I ended up using an ExecuteScript
processor to create a "ListFile" processor that offers an upstream connection:
import java.nio.charset.StandardCharsets
import groovy.io.FileType
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def fetchDirectory = flowFile.getAttribute('fetchDirectory')
def listOfFiles = []
def dir = new File(fetchDirectory)
if(dir.exists()) {
dir.eachFileRecurse (FileType.FILES) { file ->
listOfFiles << file
}
}
listOfFiles.each { i ->
def newFlowFile = session.create()
session.putAttribute(newFlowFile, 'path', i.path)
session.putAttribute(newFlowFile, 'filename', i.getName())
flowFiles << newFlowFile
}
session.remove(flowFile)
session.transfer(flowFiles, REL_SUCCESS)
So, when the external tool completes, I route the block's FlowFile to the above processor, which I then pipe to a FetchFile
processor.