One of my nextflow processes SORT_BAM creates three types of files:
I want to use sorted.bam files as an input for the last process ADD_RG, which must not commence prior to *sorted.bam file(s) generation.
I have tried reading channel operator but can't figure out how to implement it. Probably something along the lines of grouping *sorted.bam and chaining to next process? or emit *sorted.bam as xyz and then chain xyz to ADD_RG process?
I have the contents of two modules and nextflow file below.
#module SORT_BAM
#!/usr/bin/env nextflow
process SORT_BAM {
debug true
publishDir 'results/bams/sorted', mode: 'copy', overwrite: false
input:
path bam
output:
path "*"
// make sure you are using recent version of samtools to avoid issues with -o option
script:
"""
samtools sort $bam -o ${bam}_sorted.bam
samtools index ${bam}_sorted.bam
samtools flagstat ${bam}_sorted.bam -O tsv >> ${bam}_flagstat.txt
"""
}
#module ADD_RG
#!/usr/bin/env nextflow
process ADD_RG {
debug true
publishDir 'results/bams/RG_bams', mode: 'copy', overwrite: false
input:
path bam
output:
path "*sortedRG.bam", emit: RG_bam_files
shell:
'''
SUFFIX=$(echo !{bam} | cut -d"_" -f 2)
picard AddOrReplaceReadGroups \
I=!{bam} \
O=!{bam}_sortedRG.bam \
RGID=$(echo !{bam.baseName} | cut -d"_" -f 3-5) \
RGLB="lib1"\
RGPL="illumina" \
RGPU="safi_${SUFFIX}" \
RGSM="safi_${SUFFIX}"
'''
}
#nextflow file
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
params.genome_index = file("$baseDir/genome/fEpiCoi_cnag1_curated_primary.no_mt.fa")
// These paths are relative to the workflow .nf file
include { TRIM } from './modules/trim.nf'
include { BWA } from './modules/bwa.nf'
include { SAM_TO_BAM } from './modules/sam2bam.nf'
include { SORT_BAM } from './modules/sort_bam.nf'
include { ADD_RG } from './modules/add_RG.nf'
Channel
.fromFilePairs("$baseDir/subset_fq/*_{1,2}.fq.gz", checkIfExists:true)
.set{ paired_reads }
workflow {
genome = params.genome_index
TRIM(paired_reads)
BWA(TRIM.out.trimmed_seqs, genome)
SAM_TO_BAM(BWA.out.sam_files)
SORT_BAM(SAM_TO_BAM.out.bam_files)
ADD_RG(????????)
}
thanks.
Using output: path "*"
in the SORT_BAM
process will cause each file emitted to be individually sent to the next process. That includes the ${bam}_flagstat.txt
, which would cause the the process to throw an error.
If you are more careful about your output declaration you could solve this. Something like this:
process SORT_BAM {
debug true
publishDir 'results/bams/sorted', mode: 'copy', overwrite: false
input:
path bam
output:
tuple path("${bam}_sorted.bam"), path("${bam}_sorted.bam.bai"), emit: sorted_bams
path("${bam}_flagstat.txt"), emit: bam_flagstat
script:
"""
samtools sort $bam -o ${bam}_sorted.bam
samtools index ${bam}_sorted.bam
samtools flagstat ${bam}_sorted.bam -O tsv >> ${bam}_flagstat.txt
"""
}
Notice that I've moved the flagstat away from the sorted bam channel as I don't think it's needed for RG assignment, but easay to change.
Here is the input declaration for the ADD_RG
:
input:
tuple path(bam), path(bai)
Then the workflow declaration would become:
workflow {
genome = params.genome_index
TRIM(paired_reads)
BWA(TRIM.out.trimmed_seqs, genome)
SAM_TO_BAM(BWA.out.sam_files)
SORT_BAM(SAM_TO_BAM.out.bam_files)
ADD_RG(SORT_BAM.out.sorted_bams)
}