nextflow

group outputs by extension and chain one of these for next process


One of my nextflow processes SORT_BAM creates three types of files:

  1. *sorted.bam
  2. *sorted.bam.bai
  3. *flagstat.txt

I want to use sorted.bam files as an input for the last process ADD_RG, which must not commence prior to *sorted.bam file(s) generation.

I have tried reading channel operator but can't figure out how to implement it. Probably something along the lines of grouping *sorted.bam and chaining to next process? or emit *sorted.bam as xyz and then chain xyz to ADD_RG process?

I have the contents of two modules and nextflow file below.

#module SORT_BAM
#!/usr/bin/env nextflow

process SORT_BAM {
  debug true

  publishDir 'results/bams/sorted', mode: 'copy', overwrite: false

  input:
  path bam

  output:
  path "*"

  
  // make sure you are using recent version of samtools to avoid issues with -o option
 
  script:
  """
  samtools sort $bam -o ${bam}_sorted.bam
  samtools index ${bam}_sorted.bam
  samtools flagstat ${bam}_sorted.bam -O tsv >> ${bam}_flagstat.txt
  """
}


#module ADD_RG
#!/usr/bin/env nextflow

process ADD_RG {
  debug true

  publishDir 'results/bams/RG_bams', mode: 'copy', overwrite: false

  input:
  path bam

  output:
  path "*sortedRG.bam", emit: RG_bam_files

shell:
    '''
    SUFFIX=$(echo !{bam} | cut -d"_" -f 2)
    
    picard AddOrReplaceReadGroups \
      I=!{bam} \
      O=!{bam}_sortedRG.bam \
      RGID=$(echo !{bam.baseName} | cut -d"_" -f 3-5) \
      RGLB="lib1"\
      RGPL="illumina" \
      RGPU="safi_${SUFFIX}" \
      RGSM="safi_${SUFFIX}"
    '''
}


#nextflow file
#!/usr/bin/env nextflow
nextflow.enable.dsl=2


params.genome_index = file("$baseDir/genome/fEpiCoi_cnag1_curated_primary.no_mt.fa")

// These paths are relative to the workflow .nf file
include { TRIM } from './modules/trim.nf'
include { BWA } from './modules/bwa.nf'
include { SAM_TO_BAM } from './modules/sam2bam.nf'
include { SORT_BAM } from './modules/sort_bam.nf'
include { ADD_RG } from './modules/add_RG.nf'

Channel
  .fromFilePairs("$baseDir/subset_fq/*_{1,2}.fq.gz", checkIfExists:true)
  .set{ paired_reads }

workflow {

    genome = params.genome_index
    TRIM(paired_reads)
    BWA(TRIM.out.trimmed_seqs, genome)
    SAM_TO_BAM(BWA.out.sam_files)
    SORT_BAM(SAM_TO_BAM.out.bam_files)
    ADD_RG(????????)  
}

thanks.


Solution

  • Using output: path "*" in the SORT_BAM process will cause each file emitted to be individually sent to the next process. That includes the ${bam}_flagstat.txt, which would cause the the process to throw an error.

    If you are more careful about your output declaration you could solve this. Something like this:

    process SORT_BAM {
      debug true
    
      publishDir 'results/bams/sorted', mode: 'copy', overwrite: false
    
      input:
      path bam
    
      output:
      tuple path("${bam}_sorted.bam"), path("${bam}_sorted.bam.bai"), emit: sorted_bams
      path("${bam}_flagstat.txt"), emit: bam_flagstat 
    
      script:
      """
      samtools sort $bam -o ${bam}_sorted.bam
      samtools index ${bam}_sorted.bam
      samtools flagstat ${bam}_sorted.bam -O tsv >> ${bam}_flagstat.txt
      """
    }
    

    Notice that I've moved the flagstat away from the sorted bam channel as I don't think it's needed for RG assignment, but easay to change.

    Here is the input declaration for the ADD_RG:

      input:
      tuple path(bam), path(bai)
    

    Then the workflow declaration would become:

    workflow {
        genome = params.genome_index
        TRIM(paired_reads)
        BWA(TRIM.out.trimmed_seqs, genome)
        SAM_TO_BAM(BWA.out.sam_files)
        SORT_BAM(SAM_TO_BAM.out.bam_files)
        ADD_RG(SORT_BAM.out.sorted_bams)  
    }