apache-nifihortonworks-dataflow

Triggered FetchFolder in NiFi?


I'm using NiFi to orchestrate the processing of large binary files using a proprietary processing tool (which runs external to NiFi).

NiFi drops the source files on disk, I call the external tool (using an ExecuteScript processor), the tool loads the binary file and proceeds to generate a lot of smaller files.

When the external tool is completely finished, I need to "pick up" the directory of smaller (generated) files and continue to process via NiFi. I need to wait because the [output directory], [number of files], and [time required to process] are dynamic.

The problem:

  1. GetFile (to grab a directory) doesn't have an upstream connection, so I can't trigger it upon completion of processing.
  2. A ListFile + FetchFile combo doesn't work b/c ListFile doesn't have an upstream connection, so -- again -- I can't trigger it upon completion of processing.

... so what processor(s) can I use to, upon completion of the binary processing, grab the directory of new files and bring them into NiFi land?


Solution

  • Somewhat in-line with @Bryan Bende's answer, I ended up using an ExecuteScript processor to create a "ListFile" processor that offers an upstream connection:

    import java.nio.charset.StandardCharsets
    import groovy.io.FileType
    def flowFile = session.get()
    if(!flowFile) return
    def flowFiles = [] as List<FlowFile>
    def fetchDirectory = flowFile.getAttribute('fetchDirectory')
    def listOfFiles = []
    def dir = new File(fetchDirectory)
    if(dir.exists()) {
       dir.eachFileRecurse (FileType.FILES) { file ->
          listOfFiles << file
       }
    }
    listOfFiles.each { i ->
       def newFlowFile = session.create()
       session.putAttribute(newFlowFile, 'path', i.path)
       session.putAttribute(newFlowFile, 'filename', i.getName())
       flowFiles << newFlowFile
    }
    session.remove(flowFile)
    session.transfer(flowFiles, REL_SUCCESS)
    

    So, when the external tool completes, I route the block's FlowFile to the above processor, which I then pipe to a FetchFile processor.