I am attempting to merge x number of bam files produced via performing multiple alignments at once (on batches of y number of fastq files) into one single bam file in Nextflow.
So far I have the following when performing the alignment and sorting/indexing the resulting bam file:
//Run minimap2 on concatenated fastqs
process miniMap2Bam {
publishDir "$params.bamDir"
errorStrategy 'retry'
cache 'deep'
maxRetries 3
maxForks 10
memory { 16.GB * task.attempt }
input:
val dirString from dirStr
val runString from stringRun
each file(batchFastq) from fastqBatch.flatMap()
output:
val runString into stringRun1
file("${batchFastq}.bam") into bamFiles
val dirString into dirStrSam
script:
"""
minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
samtools index ${batchFastq}.bam
"""
}
Where ${batchFastq}.bam
is a bam file containing a batch of y number of fastq files.
This pipeline completes just fine, however, when attempting to perform samtools merge
on these bam files in another process (samToolsMerge), the process runs each time an alignment is run (in this case, 4), instead of once for all bam files collected:
//Run samtools merge
process samToolsMerge {
echo true
publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
cache 'deep'
errorStrategy 'retry'
maxRetries 3
maxForks 10
memory { 14.GB * task.attempt }
input:
val runString from stringRun1
file bamFile from bamFiles.collect()
val dirString from dirStrSam
output:
file("**")
script:
"""
samtools merge ${runString}.bam ${bamFile}
"""
}
With the output being:
executor > lsf (9)
[49/182ec0] process > catFastqs (1) [100%] 1 of 1 ✔
[- ] process > nanoPlotSummary -
[0e/609a7a] process > miniMap2Bam (1) [100%] 4 of 4 ✔
[42/72469d] process > samToolsMerge (2) [100%] 4 of 4 ✔
Completed at: 04-Mar-2021 14:54:21
Duration : 5m 41s
CPU hours : 0.2
Succeeded : 9
How can I take just the resulting bam files from miniMap2Bam
and run them through samToolsMerge
a single time, instead of the process running multiple times?
Thanks in advance!
EDIT: Thanks to Pallie in the comments below, the issue was feeding the runString and dirString values from a prior process into miniMap2Bam and then samToolsMerge, causing the process to repeat itself each time a value was passed on.
The solution was as simple as removing the vals from miniMap2Bam (as follows):
//Run minimap2 on concatenated fastqs
process miniMap2Bam {
errorStrategy 'retry'
cache 'deep'
maxRetries 3
maxForks 10
memory { 16.GB * task.attempt }
input:
each file(batchFastq) from fastqBatch.flatMap()
output:
file("${batchFastq}.bam") into bamFiles
script:
"""
minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
samtools index ${batchFastq}.bam
"""
}
The simplest fix would probably to stop passing the static dirstring and runstring around via channels:
// Instead of a hardcoded path use a parameter you passed via CLI like you did with bamDir
dirString = file("/path/to/fastqs/")
runString = file("/path/to/fastqs/").getParent()
fastqBatch = Channel.from("/path/to/fastqs/")
//Run minimap2 on concatenated fastqs
process miniMap2Bam {
publishDir "$params.bamDir"
errorStrategy 'retry'
cache 'deep'
maxRetries 3
maxForks 10
memory { 16.GB * task.attempt }
input:
each file(batchFastq) from fastqBatch.flatMap()
output:
file("${batchFastq}.bam") into bamFiles
script:
"""
minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
samtools index ${batchFastq}.bam
"""
}
//Run samtools merge
process samToolsMerge {
echo true
publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
cache 'deep'
errorStrategy 'retry'
maxRetries 3
maxForks 10
memory { 14.GB * task.attempt }
input:
file bamFile from bamFiles.collect()
output:
file("**")
script:
"""
samtools merge ${runString}.bam ${bamFile}
"""