nextflow

Creating a proper channel for a single process generating more than one fasta


I created a pipeline that starts with fastq.gz files and then process CONSENSUS_BUILDING creates one or more consensus fasta sequences and the process BLAST does blast on each. For a given fastq.gz file, when the CONSENSUS_BUILDING process outputs a single consensus sequence it works fine, but when it is more than one I get an error as it creates the wrong command, for example, if a given fastq.gz file leads to three consensus sequences I get the following command:

blastn -db references/hAdv_filtered.fasta \ 
  -query consensus_reference_121.fasta consensus_reference_151.fasta consensus_reference_186.fasta \
  -outfmt '6 sseqid stitle pident length mismatch gapopen qstart qend sstart send evalue bitscore' \
  -out [consensus_reference_121, consensus_reference_151, consensus_reference_186].blast 

How can I fix this so that it does separate BLAST processes for each consensus sequence? Below is my current workflow:

process CONSENSUS_BUILDING {

    publishDir "${params.outdir}/consensus", mode: 'copy'

    input:
    path fastq

    output:
    path "${fastq.getSimpleName()}_ID/*.fasta", emit: consensus_fasta

    script:
    """
    gunzip -f ${fastq}
    NGSpeciesID --ont --sample_size 10000 --consensus --racon --racon_iter 3 --fastq ${fastq.getSimpleName()}.fastq --m\
 650 --s 50 --outfolder ${fastq.getSimpleName()}_ID
    """
}

process BLAST {

    publishDir "${params.outdir}/blast", mode: 'copy'
    container  'docker://ncbi/blast'

    input:
    tuple val(sampleID), path(fasta)
    path db

    output:
    path "${sampleID}.blast"

    script:
    """
    blastn -db $db/$db_name -query ${fasta} -outfmt '6 sseqid stitle pident length mismatch gapopen qstart qend sstart send\
 evalue bitscore' -out ${sampleID}.blast
    """
}


// Overall workflow
workflow {
    ch_reads = Channel.fromPath( "${params.reads}/*.fastq.gz" )
    ch_consensus = CONSENSUS_BUILDING(ch_reads)
    CONSENSUS_BUILDING
      .out
      .consensus_fasta
      .map{ it -> tuple(it.simpleName, it) }
      .set{ blast_in }
    ch_blast = BLAST(blast_in, db_dir)
}


Solution

  • I think it would be easier if you ensure that only 1 file is passed from CONSENSUS_BUILDING to BLAST, which you could do by ensuring that you use the flatten operator, but given the script I would have expected it to be emitting only 1 file per process anyway. Note, you should put flatten before the map operator if this is how you choose to move forward.

    Otherwise you can define a function for elastic input handling. Here is an example for BLAST:

    process BLAST {
    
        publishDir "${params.outdir}/blast", mode: 'copy'
        container  'docker://ncbi/blast'
    
        input:
        tuple val(sampleID), path(fasta)
        path db
    
        output:
        path "${sampleID}.blast"
    
        script:
        def elastic_blast = fasta.collect{ "newname=`basename $it | sed "s/.fasta//"; blastn -db $db/$db_name -query $it -outfmt '6 sseqid stitle pident length mismatch gapopen qstart qend sstart send evalue bitscore' -out \${newname}.blast }.join("\n")
        """
        ${elastic_blast}
        """
    }
    

    Code not tested, so may need some debugging. I'm also unsure what your input tuples would look like for BLAST if CONSENSUS_BUILDING is outputting multiple fastas in one object. Is it [consensus_121, [consensus_reference_121.fasta, consensus_reference_151.fasta, consensus_reference_186.fasta,]]? However, since you're using bash to generate the sample ID variable called newname, you could remove the map operator from the workflow, and change the input declaration for BLAST to just path(fasta) and this would still work.

    Finally, I'm also unsure if you need the collect in the def block, but it's how I've always used it.