I am running a Nextflow pipeline that basically sends a bunch of files through multiple tools and collects resulting CSVs; however, two of the tools I need cannot be run simultaneously (they use the same software under the hood, and we only have one license). The pipeline looks like this:
workflow MY_WORKFLOW {
take:
pdb_ch
main:
csv1_ch = TOOL1(pdb_ch)
bundled_pdb_ch = pdb_ch.collate(20)
csv2_ch = TOOL2(bundled_pdb_ch)
csv3_ch = TOOL3(bundled_pdb_ch)
out_ch = csv1_ch.mix(csv2_ch).mix(csv3_ch)
emit:
out_ch
}
The order in which they launch doesn't matter: I'm okay if TOOL1 processes the entire dataset and then TOOL3 starts, or if they run in alternating order, or whatever. What matters is that at any point in time there should be either one of these two running or neither, but never both.
Using maxForks
directive allows making sure only only one instance of a given process is running, but is there a way to generate some more general lock/semaphore that could be shared between different processes?
I have managed to solve it by adapting the answer found in another SO question. The trick is to create a process that accepts TOOL1 output and emits some value, like this:
process BLOCK {
input:
path csv_files
output:
val ready
// Does nothing except return a value
exec:
ready = 1
}
Then this process needs to be inserted into the pipeline by feeding it TOOL3
results and making its outputs an additional input for TOOL1:
bundled_pdb_ch = pdb_ch.collate(20)
csv2_ch = TOOL2(bundled_pdb_ch)
csv3_ch = TOOL3(bundled_pdb_ch)
semaphore_ch = BLOCK(csv3_ch.collect())
csv1_ch = TOOL1(pdb_ch, semaphore_ch)
out_ch = csv1_ch.mix(csv2_ch).mix(csv3_ch)
BLOCK
is only launched when csv_ch3
can be collected (ie all instances of TOOL3 have finished), and TOOL1
will not run without a value from semaphore_ch
. This value is actually ignored by TOOL1
body, but it is still necessary for process to launch. And since it's a value channel, it can be read as many times as necessary without being exhausted.
Technically, intermediate process is not necessary because I could just pass csv3_ch.collect()
to a second channel of TOOL1
. However, it prevents TOOL1
work directories from being littered with TOOL3
outputs