I created a pipeline that starts with fastq.gz files and then process CONSENSUS_BUILDING creates one or more consensus fasta sequences and the process BLAST does blast on each. For a given fastq.gz file, when the CONSENSUS_BUILDING process outputs a single consensus sequence it works fine, but when it is more than one I get an error as it creates the wrong command, for example, if a given fastq.gz file leads to three consensus sequences I get the following command:
blastn -db references/hAdv_filtered.fasta \
-query consensus_reference_121.fasta consensus_reference_151.fasta consensus_reference_186.fasta \
-outfmt '6 sseqid stitle pident length mismatch gapopen qstart qend sstart send evalue bitscore' \
-out [consensus_reference_121, consensus_reference_151, consensus_reference_186].blast
How can I fix this so that it does separate BLAST processes for each consensus sequence? Below is my current workflow:
process CONSENSUS_BUILDING {
publishDir "${params.outdir}/consensus", mode: 'copy'
input:
path fastq
output:
path "${fastq.getSimpleName()}_ID/*.fasta", emit: consensus_fasta
script:
"""
gunzip -f ${fastq}
NGSpeciesID --ont --sample_size 10000 --consensus --racon --racon_iter 3 --fastq ${fastq.getSimpleName()}.fastq --m\
650 --s 50 --outfolder ${fastq.getSimpleName()}_ID
"""
}
process BLAST {
publishDir "${params.outdir}/blast", mode: 'copy'
container 'docker://ncbi/blast'
input:
tuple val(sampleID), path(fasta)
path db
output:
path "${sampleID}.blast"
script:
"""
blastn -db $db/$db_name -query ${fasta} -outfmt '6 sseqid stitle pident length mismatch gapopen qstart qend sstart send\
evalue bitscore' -out ${sampleID}.blast
"""
}
// Overall workflow
workflow {
ch_reads = Channel.fromPath( "${params.reads}/*.fastq.gz" )
ch_consensus = CONSENSUS_BUILDING(ch_reads)
CONSENSUS_BUILDING
.out
.consensus_fasta
.map{ it -> tuple(it.simpleName, it) }
.set{ blast_in }
ch_blast = BLAST(blast_in, db_dir)
}
I think it would be easier if you ensure that only 1 file is passed from CONSENSUS_BUILDING
to BLAST
, which you could do by ensuring that you use the flatten
operator, but given the script I would have expected it to be emitting only 1 file per process anyway. Note, you should put flatten
before the map
operator if this is how you choose to move forward.
Otherwise you can define a function for elastic input handling. Here is an example for BLAST
:
process BLAST {
publishDir "${params.outdir}/blast", mode: 'copy'
container 'docker://ncbi/blast'
input:
tuple val(sampleID), path(fasta)
path db
output:
path "${sampleID}.blast"
script:
def elastic_blast = fasta.collect{ "newname=`basename $it | sed "s/.fasta//"; blastn -db $db/$db_name -query $it -outfmt '6 sseqid stitle pident length mismatch gapopen qstart qend sstart send evalue bitscore' -out \${newname}.blast }.join("\n")
"""
${elastic_blast}
"""
}
Code not tested, so may need some debugging. I'm also unsure what your input tuples would look like for BLAST
if CONSENSUS_BUILDING
is outputting multiple fastas in one object. Is it [consensus_121, [consensus_reference_121.fasta, consensus_reference_151.fasta, consensus_reference_186.fasta,]]
? However, since you're using bash to generate the sample ID variable called newname
, you could remove the map
operator from the workflow, and change the input declaration for BLAST
to just path(fasta)
and this would still work.
Finally, I'm also unsure if you need the collect
in the def
block, but it's how I've always used it.