rlinuxbashrscriptdoparallel

doPar does not initialize when run through Rscript


I want to create an Rscript that uses dopar to count the lines of many files using many cores, then outputs a TSV of the file names, plus the number of lines (divided by 4).

I am using Ubuntu 20.04. If I use the below script iteratively by opening R and copying/pasting, it works. If I use the script in the command line using Rscript, it gives this error: ssh: connect to host 0.0.0.20 port 22: Connection timed out. I assume this is a problem with the registering of the PSOCK cluster. How can I make this script work using Rscript from the command line?

(NB: entering it iteratively I would load the libraries, then just define raw.path, min.reads, and n.cores before copying and pasting the rest of the script without a problem.)

#! /usr/bin/Rscript

###### 
## Counting and excluding low read fastqs

# This script counts the number of reads in a fastq and moves those under a certain
# threshold to a separate file


## Collect arguments
args <- commandArgs(TRUE)

## Parse arguments (we expect the form --arg=value)
parseArgs <- function(x) strsplit(sub("^--", "", x), "=")
argsL <- as.list(as.character(as.data.frame(do.call("rbind", parseArgs(args)))$V2))
names(argsL) <- as.data.frame(do.call("rbind", parseArgs(args)))$V1
args <- argsL
rm(argsL)

if("--help" %in% args | is.null(args$files) | is.null(args$min_reads)) {
  cat("
      Counts the number of reads in the gzipped FASTQs in a folder, then moves
      files below a certain number of reads to a folder called 'less-than-[min_reads]'
  
      --files=string        - Full path to folder with raw files
      --min_reads=numeric   - Minimum number of reads for a file to be included
      --n_cores=numeric     - Number of cores to use
      
      Example:
      Rscript count_and_move_raw_fastqs.R --files=raw/ \\   \n --min_reads=2e6 \\ \n --n_cores=10 \n\n")
  
  q(save="no")
}

## Import arguments
raw.path <- args$files
min.reads <- args$min_reads
n.cores <- args$n_cores

# Import libraries
library(foreach)
library(doParallel)
library(tidyverse)

# set wd to raw.path
setwd(raw.path)


# Set up parallelization with n cores
my.cluster <- makePSOCKcluster(nnodes = n.cores, names = n.cores, outfile="")
registerDoParallel(cl = my.cluster)


# Create folder for low read files to go
output_dir <- paste("files-with-less-than", min.reads,"-reads", sep = "")
dir.create(output_dir)

# Create function to count the lines in the file, and move files < min.reads to a different directory
process_file <- function(file){
  
  # Get the base name (without R1) for each file
  file.name <- stringr::str_split_fixed(file, "_", n = 2)[,1]
  
  # Count the reads
  message(paste("Counting reads in ", file, " as of ", Sys.time(), sep = ""))
  reads <- as.numeric(system(paste("gunzip -c ", file, " | sed -n '$=' | awk '{print $1/4}'", sep = ""), intern = TRUE))
  
  # Move files if <= min.reads
  if(reads <= min.reads){
    # Find all files with the name
    files.to.move <- list.files(pattern = file.name)
    # Move them
    purrr::map(files.to.move, function(x) file.rename(x, paste(output_dir, x, sep = "/")))
  }
  
  # Create final df of reads and names
  output_txt <- file.path(getwd(), "reads_and_names_for_raw_files.txt")
  cat(paste(file.name, reads, "\n", sep = "\t"), file = output_txt, append = TRUE)
}

# Find the files to count
files.to.count <-  list.files(getwd(), pattern = "_R1_001.fastq.gz$")

# Loop over each file in parallel
foreach(file=files.to.count, .combine = c) %dopar%{
    process_file(file)
  }


stopCluster(my.cluster)

Solution

  • Since you are running on a Unix-alike, consider just using the parallel package's mclapply() function:

    #! /usr/bin/Rscript
    
    ###### 
    ## Counting and excluding low read fastqs
    
    # This script counts the number of reads in a fastq and moves those under a certain
    # threshold to a separate file
    
    
    ## Collect arguments
    args <- commandArgs(TRUE)
    
    ## Parse arguments (we expect the form --arg=value)
    parseArgs <- function(x) strsplit(sub("^--", "", x), "=")
    argsL <- as.list(as.character(as.data.frame(do.call("rbind", parseArgs(args)))$V2))
    names(argsL) <- as.data.frame(do.call("rbind", parseArgs(args)))$V1
    args <- argsL
    rm(argsL)
    
    if("--help" %in% args | is.null(args$files) | is.null(args$min_reads)) {
      cat("
          Counts the number of reads in the gzipped FASTQs in a folder, then moves
          files below a certain number of reads to a folder called 'less-than-[min_reads]'
      
          --files=string        - Full path to folder with raw files
          --min_reads=numeric   - Minimum number of reads for a file to be included
          --n_cores=numeric     - Number of cores to use
          
          Example:
          Rscript count_and_move_raw_fastqs.R --files=raw/ \\   \n --min_reads=2e6 \\ \n --n_cores=10 \n\n")
      
      q(save="no")
    }
    
    ## Import arguments
    raw.path <- args$files
    min.reads <- args$min_reads
    n.cores <- args$n_cores
    
    library(tidyverse)
    
    # set wd to raw.path
    setwd(raw.path)
    
    # Create folder for low read files to go
    output_dir <- paste("files-with-less-than", min.reads,"-reads", sep = "")
    dir.create(output_dir)
    
    # Create function to count the lines in the file, and move files < min.reads to a different directory
    process_file <- function(file){
      
      # Get the base name (without R1) for each file
      file.name <- stringr::str_split_fixed(file, "_", n = 2)[,1]
      
      # Count the reads
      message(paste("Counting reads in ", file, " as of ", Sys.time(), sep = ""))
      reads <- as.numeric(system(paste("gunzip -c ", file, " | sed -n '$=' | awk '{print $1/4}'", sep = ""), intern = TRUE))
      
      # Move files if <= min.reads
      if(reads <= min.reads){
        # Find all files with the name
        files.to.move <- list.files(pattern = file.name)
        # Move them
        purrr::map(files.to.move, function(x) file.rename(x, paste(output_dir, x, sep = "/")))
      }
      
      # Create final list of reads and names
      paste(file.name, reads, "\n", sep = "\t")
    }
    
    # Find the files to count
    files.to.count <-  list.files(getwd(), pattern = "_R1_001.fastq.gz$")
    
    # Process each file in parallel and output a list of reads and names
    output_txt = parallel::mclapply(files.to.count, process_file, mc.cores = n.cores)
    
    # Serial write of the list of reads and names
    outfile <- file.path(getwd(), "reads_and_names_for_raw_files.txt")
    lapply(output_txt, cat, file = outfile)
    

    This will have less memory overhead, since the forked parallel R sessions share most of their memory space. A socket-connected collection of R sessions does not.

    Also, instead of parallel writing with cat, the code collects the lines of text in a list, which is written with a plain serial `lapply'. Parallel writing into one file is not recommended unless special care is taken that writers do not run into each other, resulting in a garbled text.

    I did not run your code as I don't have the input files. Let me know how it works for you.