pipelinebioinformaticsnextflow

How can I make sure Nextflow processes don't run simultaneously?


I am running a Nextflow pipeline that basically sends a bunch of files through multiple tools and collects resulting CSVs; however, two of the tools I need cannot be run simultaneously (they use the same software under the hood, and we only have one license). The pipeline looks like this:

workflow MY_WORKFLOW {
    take:
    pdb_ch

    main:
    csv1_ch = TOOL1(pdb_ch)
    bundled_pdb_ch = pdb_ch.collate(20)
    csv2_ch = TOOL2(bundled_pdb_ch)
    csv3_ch = TOOL3(bundled_pdb_ch)
    out_ch = csv1_ch.mix(csv2_ch).mix(csv3_ch)

    emit:
    out_ch
}

The order in which they launch doesn't matter: I'm okay if TOOL1 processes the entire dataset and then TOOL3 starts, or if they run in alternating order, or whatever. What matters is that at any point in time there should be either one of these two running or neither, but never both.

Using maxForks directive allows making sure only only one instance of a given process is running, but is there a way to generate some more general lock/semaphore that could be shared between different processes?


Solution

  • I have managed to solve it by adapting the answer found in another SO question. The trick is to create a process that accepts TOOL1 output and emits some value, like this:

    process BLOCK {
        input:
        path csv_files
    
        output:
        val ready
        // Does nothing except return a value
        
        exec:
        ready = 1
    }
    

    Then this process needs to be inserted into the pipeline by feeding it TOOL3 results and making its outputs an additional input for TOOL1:

    bundled_pdb_ch = pdb_ch.collate(20)
    csv2_ch = TOOL2(bundled_pdb_ch)
    csv3_ch = TOOL3(bundled_pdb_ch)
    semaphore_ch = BLOCK(csv3_ch.collect())
    csv1_ch = TOOL1(pdb_ch, semaphore_ch)
    out_ch = csv1_ch.mix(csv2_ch).mix(csv3_ch)
    

    BLOCK is only launched when csv_ch3 can be collected (ie all instances of TOOL3 have finished), and TOOL1 will not run without a value from semaphore_ch. This value is actually ignored by TOOL1 body, but it is still necessary for process to launch. And since it's a value channel, it can be read as many times as necessary without being exhausted.

    Technically, intermediate process is not necessary because I could just pass csv3_ch.collect() to a second channel of TOOL1. However, it prevents TOOL1 work directories from being littered with TOOL3 outputs