I'm setting up a Nextflow pipeline that can process both single and multiple sets of input files. When processing a single set of files, I want to use command-line arguments like:
nextflow run main.nf --fasta sample.fasta --hmmdb database.hmm
For multiple sets of files, I prefer to provide them via a CSV file:
nextflow run main.nf --input samples.csv
The samples.csv file looks like this:
fasta,hmmdb
sample1.fasta,database1.hmm
sample2.fasta,database2.hmm
Current Workflow:
Here's the relevant part of my main.nf script:
workflow {
main:
if (params.input != null) {
// Read input CSV file
input_ch = Channel
.fromPath(params.input)
.splitCsv(header: true)
.map { row -> tuple(
file(row.fasta),
file(row.hmmdb)
)
}
input_ch.view()
} else {
// Use conventional arguments
input_ch = Channel.of(
tuple(
file(params.fasta),
file(params.hmmdb)
)
)
}
ch_versions = Channel.empty()
// Launch the main pipeline workflow
ACTUAL_PIPELINE(
input_ch,
ch_versions
)
ch_versions = ch_versions.mix(ACTUAL_PIPELINE.out.versions)
//...
}
And the ACTUAL_PIPELINE
workflow:
workflow ACTUAL_PIPELINE {
take:
ch_params // Channel containing tuples of [fasta_file, hmmdb_file]
ch_versions // Channel for version information
main:
// Attempting to access the files from the channel
collected = ch_params.collect()
fasta = collected[0]
hmmdb = collected[1]
// Rest of the pipeline
//...
}
When I try to collect the contents of ch_params using collect(), and then access the files with collected[0] and collected[1], I encounter the following error:
ERROR ~ Unexpected error [StackOverflowError]
ch_params
within the ACTUAL_PIPELINE
workflow?Thank you for your assistance!
How can I properly access or iterate over the files from ch_params within the ACTUAL_PIPELINE workflow?
Note that ch_params is a channel, so calling the collect operator will also return a channel (specifically a value channel). It cannot be sliced like a List, which I think is the issue here. One solution might be to pass in a closure to transform each item before it is collected (assuming that is what is needed), for example:
workflow ACTUAL_PIPELINE {
take:
ch_params
ch_versions
main:
fasta_ch = ch_params.collect { fasta, hmmdb -> fasta }
hmmdb_ch = ch_params.collect { fasta, hmmdb -> hmmdb }
...
}
Is there a Nextflow-specific way to handle both single and multiple inputs efficiently without running into errors?
Consider instead using the nf-schema plugin. It supports sample sheet formats including CSV, TSV, JSON and YAML. You would still need to handle your single and multiple inputs somehow (an if/else statement like what you have already is fine), but it lets you at least validate your inputs thereby reducing errors. Specifically, it lets you validate your input parameters against a pipeline schema, as well as validate the contents of your sample sheet against a sample sheet schema. From the docs:
include { validateParameters; paramsSummaryLog; samplesheetToList } from 'plugin/nf-schema'
// Validate input parameters
validateParameters()
// Print summary of supplied parameters
log.info paramsSummaryLog(workflow)
// Create a new channel of metadata from a sample sheet passed to the pipeline through the --input parameter
ch_input = Channel.fromList(samplesheetToList(params.input, "assets/schema_input.json"))
There's really no way to avoid errors, but the Nextflow extension for VS Code should help with syntax highlighting etc: