pipelinenextflow

Nextflow Pipeline Stops After Seqtk Process with No Error for Pad Read Process


I am working on a Nextflow pipeline version 24.10.3 that includes two processes: seqtk and pad_read. The pipeline is supposed to process reads using seqtk and then pad them using a Python script. However, I am encountering the following error:

ERROR ~ Unexpected error while finalizing task 'seqtk' - cause: No such property: reference_based_assembly for class: sun.nio.fs.UnixPath

-- Check '.nextflow.log' file for details

NOTE: For this particular error (as mentioned above), I was able to solve this by extracting specific files and define them into forward and reverse read, Specifically, the pipeline successfully executes the seqtk process, which converts FASTQ files to FASTA files, but the execution stops there. The pad_read process does not complete, and no error messages are shown. This is how it worked but it did not run pad read process. After resuming the process it shows the following:

Launching `/home/ganga/nextflow_dir/dnrgb9.nf` [cheesy_stone] DSL2 - revision: 78fccb0ec6

[88/a309a0] fastqc        | 1 of 1, cached: 1 ✔
[d0/ced8a8] multiqc       | 1 of 1, cached: 1 ✔
[18/c16dde] trimmomatic   | 1 of 1, cached: 1 ✔
[3d/2fefec] flash         | 1 of 1, cached: 1 ✔
[d2/e0f28f] unicycler     | 1 of 1, cached: 1 ✔
[19/ff6f3a] quast         | 1 of 1, cached: 1 ✔
[82/0da2fe] plentyofbugs  | 1 of 1, cached: 1 ✔
[1f/5b384d] bowtie2_build | 1 of 1, cached: 1 ✔
[0b/da69f7] bowtie2       | 1 of 1, cached: 1 ✔
[08/7a1ef0] seqtk         | 1 of 1, cached: 1 ✔
[-        ] pad_read      -
[-        ] AlignGraph    -
[-        ] quast2        -
[-        ] busco         -

This is the actual code that I used.

// Define the third process (Trimmomatic)
process trimmomatic {

    input:
    path forward_read
    path reverse_read
    val thread
    val phred
    path PATH_TO_ADAPTER_CONTAM_FILE
    val leading
    val trailing
    val slidingwindow
    val minlength

    output:
    path "${params.outputDir3}"

    script:
    """
    mkdir -p ${params.outputDir3}
    trimmomatic PE -threads "$thread" -phred"$phred" "$forward_read" "$reverse_read" \
        "${params.outputDir3}/output_1P.fq" "${params.outputDir3}/output_1U.fq" \
        "${params.outputDir3}/output_2P.fq" "${params.outputDir3}/output_2U.fq" \
        ILLUMINACLIP:"$PATH_TO_ADAPTER_CONTAM_FILE":2:30:10 LEADING:"$leading" \
        TRAILING:"$trailing" SLIDINGWINDOW:"$slidingwindow" MINLEN:"$minlength"
    """
}
// Define the process for seqtk
process seqtk {
    input:
    path bowtie2_out
    path trimmomatic_out

    output:
    path "${params.outputDir9}/*", emit: seqtk_out

    script:
    """
    mkdir -p "${params.outputDir9}"
    seqtk seq -A  "${trimmomatic_out}/output_1P.fq" > "${params.outputDir9}/output_1P.fa"
    seqtk seq -A  "${trimmomatic_out}/output_2P.fq" > "${params.outputDir9}/output_2P.fa"
    """
}

// Define process for pad read
process pad_read {
    input:
    path pad_read_path
    path forward_read
    path reverse_read

    output:
    path "${params.outputDir9}/padded_out1.fa", emit: padded_file1
    path "${params.outputDir9}/padded_out2.fa", emit: padded_file2

    script:
    """
    python "${params.pad_read_path}" "${forward_read}" "${params.outputDir9}/padded_out1.fa" 150
    python "${params.pad_read_path}" "${reverse_read}" "${params.outputDir9}/padded_out2.fa" 150
    """
}

Process definition's or workflow block is as follows:

// Workflow block
workflow {
 // Run trimmomatic on multiqc output dir
      def trimmomatic_results = trimmomatic(params.forward_read, params.reverse_read, params.thread, params.phred, params.PATH_TO_ADAPTER_CONTAM_FILE, params.leading, params.trailing, params.slidingwindow, params.minlength)

    // Run seqtk
    def seqtk_results = seqtk(bowtie2_align_result, trimmomatic_results)

    // Extract specific files from seqtk results
    def forward_read = seqtk_results.seqtk_out.filter { it.name == "output_1P.fa" }
    def reverse_read = seqtk_results.seqtk_out.filter { it.name == "output_2P.fa" }

    // Run the pad_read process
    def pad_read_results = pad_read(params.pad_read_path, forward_read, reverse_read)
}

what I was expecting is that that it should run the process pad_read by taking inputs from process seqtk and path to my python file for padding reads.


Solution

  • I think the problem is just that your pad_reads process is waiting for input from the forward_read and reverse_read channels, but none is received. This might be because of the channel filter, but you may also run into problems like these if you are writing to files outside of Nextflow's process working directory (i.e. ./work). Note that Nextflow processes are designed to run independently and isolated from each other. However, this can be circumvented by specifying absolute paths to files (e.g. by using params variables like params.outputDir3 and params.outputDir9 in your script blocks). The solution is just to ensure that your outputs are written to the process working directory (and that the process inputs are also localized within this directory). If your process input and output definitions declare tuples, you will find it much easier to work with channels (e.g. by joining two channels using the first element as a key). The code below uses a subworkflow to process each read. It might not be what you want exactly, but it does DRY your code and should be more performant, especially if the inputs files are large. If you also ensure that your Python script is executable (e.g. chmod +x pad_reads.py) and placed in a directory called 'bin' in the root of your project repository (it must also have a shebang line for Python), you can do away with your params.pad_read_path variable, for example:

    $ tree
    .
    ├── bin
    │   └── pad_reads.py
    ├── data
    │   ├── sampleA_R1.fastq.gz
    │   ├── sampleA_R2.fastq.gz
    │   ├── sampleB_R1.fastq.gz
    │   ├── sampleB_R2.fastq.gz
    │   ├── sampleC_R1.fastq.gz
    │   └── sampleC_R2.fastq.gz
    ├── main.nf
    ├── modules
    │   ├── pad_reads
    │   │   └── main.nf
    │   ├── seqtk
    │   │   └── main.nf
    │   └── trimmomatic
    │       └── main.nf
    ├── nextflow.config
    ├── subworkflows
    │   └── process_reads
    │       └── main.nf
    └── TruSeq-PE.fa
    
    9 directories, 14 files
    

    Contents of main.nf:

    include { TRIMMOMATIC_PE } from './modules/trimmomatic'
    include { PROCESS_READS as PROCESS_READ1 } from './subworkflows/process_reads'
    include { PROCESS_READS as PROCESS_READ2 } from './subworkflows/process_reads'
    
    
    workflow {
    
        reads = Channel.fromFilePairs( params.reads )
    
        adapter_contam_file = file( params.adapter_contam_file )
    
        TRIMMOMATIC_PE(
            reads,
            adapter_contam_file,
            params.leading,
            params.trailing,
            params.sliding_window,
            params.min_length,
        )
    
        PROCESS_READ1(
            TRIMMOMATIC_PE.out.trimmed_reads.map { sample, reads ->
                tuple( sample, reads[0] )
            },
            params.padding,
        )
        PROCESS_READ2(
            TRIMMOMATIC_PE.out.trimmed_reads.map { sample, reads ->
                tuple( sample, reads[1] )
            },
            params.padding,
        )
    
        PROCESS_READ1.out
            .join( PROCESS_READ2.out )
            .view()
    }
    

    Contents of nextflow.config:

    params {
    
        reads = null
    
        outdir = 'results'
    
        adapter_contam_file = 'TruSeq-PE.fa'
        leading = '3'
        trailing = '3'
        sliding_window = '4:15'
        min_length = '36'
    
        padding = 150
    }
    
    process {
    
        cpus = 1
        memory = { 1.GB * task.attempt }
        time = { 1.h * task.attempt }
    
        withName: 'TRIMMOMATIC_PE' {
            cpus = 2
            memory = { 6.GB * task.attempt }
            time = { 12.h * task.attempt }
    
            publishDir = [
                path: { "${params.outdir}/trimmomatic_pe" },
                mode: 'copy',
                enabled: true,
            ]
        }
    
        withName: 'SEQTK_SEQ' {
            cpus = 1
            memory = { 3.GB * task.attempt }
            time = { 1.h * task.attempt }
    
            publishDir = [
                path: { "${params.outdir}/seqtk_seq" },
                mode: 'copy',
                enabled: true,
            ]
        }
        
        withName: 'PAD_READS' {
            cpus = 1
            memory = { 4.GB * task.attempt }
            time = { 1.h * task.attempt }
    
            publishDir = [
                path: { "${params.outdir}/pad_reads" },
                mode: 'copy',
                enabled: true,
            ]
        }
    }
    

    Contents of modules/trimmomatic/main.nf:

    process TRIMMOMATIC_PE {
    
        tag "${sample}"
    
        conda 'trimmomatic=0.39'
    
        input:
        tuple val(sample), path(reads)
        path adapter_contam_file
        val leading
        val trailing
        val slidingwindow
        val minlength
    
        output:
        tuple val(sample), path("${sample}_{1,2}P.fq"), emit: trimmed_reads
        tuple val(sample), path("${sample}_{1,2}U.fq"), emit: unpaired_reads
        tuple val(sample), path("${sample}_trim.log"), emit: trim_log
        tuple val(sample), path("${sample}.summary"), emit: summary
    
        script:
        """
        trimmomatic PE \\
            -threads "${task.cpus}" \\
            -trimlog "${sample}_trim.log" \\
            -summary "${sample}.summary" \\
            ${reads} \\
            "${sample}_1P.fq" \\
            "${sample}_1U.fq" \\
            "${sample}_2P.fq" \\
            "${sample}_2U.fq" \\
            "ILLUMINACLIP:${adapter_contam_file}:2:30:10" \\
            "LEADING:${leading}" \\
            "TRAILING:${trailing}" \\
            "SLIDINGWINDOW:${slidingwindow}" \\
            "MINLEN:${minlength}"
        """
    }
    

    Contents of modules/seqtk/main.nf:

    rocess SEQTK_SEQ {
    
        tag "${prefix}"
    
        conda 'seqtk=1.4'
    
        input:
        tuple val(prefix), path(fastq)
    
        output:
        tuple val(prefix), path("${prefix}.fa")
    
        script:
        """
        seqtk seq -A "${fastq}" > "${prefix}.fa"
        """
    }
    

    Contents of modules/pad_reads/main.nf:

    process PAD_READS {
    
        tag "${prefix}"
    
        input:
        tuple val(prefix), path(fasta)
        val padding
    
        output:
        tuple val(prefix), path("${prefix}.padded.fa")
    
        script:
        """
        pad_reads.py "${fasta}" "${prefix}.padded.fa" "${padding}"
        """
    }
    

    Contents of subworkflows/process_reads/main.nf:

    include { SEQTK_SEQ } from '../../modules/seqtk'
    include { PAD_READS } from '../../modules/pad_reads'
    
    
    workflow PROCESS_READS {
    
        take:
        reads_ch
        padding
    
        main:
        SEQTK_SEQ(
            reads_ch.map { sample, fastq -> tuple( fastq.baseName, fastq ) }
        )
    
        PAD_READS( SEQTK_SEQ.out, padding )
    
        emit:
        reads_ch
            .map { sample, fastq -> tuple( fastq.baseName, sample ) }
            .join( PAD_READS.out )
            .map { prefix, sample, fasta -> tuple( sample, fasta ) }        
    }
    

    Run using:

    nextflow run main.nf --reads './data/*_R{1,2}.fastq.gz' -with-conda
    

    Results:

    $ nextflow run main.nf --reads './data/*_R{1,2}.fastq.gz' -with-conda
    
     N E X T F L O W   ~  version 24.10.3
    
    Launching `main.nf` [intergalactic_maxwell] DSL2 - revision: 41d7759a70
    
    [b1/61f3db] TRIMMOMATIC_PE (sampleC)             [100%] 3 of 3 ✔
    [96/f6a509] PROCESS_READ1:SEQTK_SEQ (sampleA_1P) [100%] 3 of 3 ✔
    [52/234275] PROCESS_READ1:PAD_READS (sampleA_1P) [100%] 3 of 3 ✔
    [5f/e90e55] PROCESS_READ2:SEQTK_SEQ (sampleB_2P) [100%] 3 of 3 ✔
    [d1/9b0136] PROCESS_READ2:PAD_READS (sampleB_2P) [100%] 3 of 3 ✔
    [sampleC, /path/to/work/ef/03ae84fa1f82c785c6b533d237be8c/sampleC_1P.padded.fa, /path/to/work/80/17be93cae1c5962f2efa154aaf258b/sampleC_2P.padded.fa]
    [sampleB, /path/to/work/91/ca7dd8c756aaced9303847c8b50a18/sampleB_1P.padded.fa, /path/to/work/d1/9b0136f910ebc1f8f6b3f94eeed249/sampleB_2P.padded.fa]
    [sampleA, /path/to/work/52/2342750f71139755028a7f65ec6bc6/sampleA_1P.padded.fa, /path/to/work/92/01a6085ad12ae80e24bbb52d8f16c3/sampleA_2P.padded.fa]
    

    Published files:

    $ tree results/
    results/
    ├── pad_reads
    │   ├── sampleA_1P.padded.fa
    │   ├── sampleA_2P.padded.fa
    │   ├── sampleB_1P.padded.fa
    │   ├── sampleB_2P.padded.fa
    │   ├── sampleC_1P.padded.fa
    │   └── sampleC_2P.padded.fa
    ├── seqtk_seq
    │   ├── sampleA_1P.fa
    │   ├── sampleA_2P.fa
    │   ├── sampleB_1P.fa
    │   ├── sampleB_2P.fa
    │   ├── sampleC_1P.fa
    │   └── sampleC_2P.fa
    └── trimmomatic_pe
        ├── sampleA_1P.fq
        ├── sampleA_1U.fq
        ├── sampleA_2P.fq
        ├── sampleA_2U.fq
        ├── sampleA.summary
        ├── sampleA_trim.log
        ├── sampleB_1P.fq
        ├── sampleB_1U.fq
        ├── sampleB_2P.fq
        ├── sampleB_2U.fq
        ├── sampleB.summary
        ├── sampleB_trim.log
        ├── sampleC_1P.fq
        ├── sampleC_1U.fq
        ├── sampleC_2P.fq
        ├── sampleC_2U.fq
        ├── sampleC.summary
        └── sampleC_trim.log
    
    4 directories, 30 files