I want to create an Rscript that uses dopar
to count the lines of many files using many cores, then outputs a TSV of the file names, plus the number of lines (divided by 4).
I am using Ubuntu 20.04. If I use the below script iteratively by opening R and copying/pasting, it works. If I use the script in the command line using Rscript
, it gives this error: ssh: connect to host 0.0.0.20 port 22: Connection timed out
. I assume this is a problem with the registering of the PSOCK cluster. How can I make this script work using Rscript
from the command line?
(NB: entering it iteratively I would load the libraries, then just define raw.path
, min.reads
, and n.cores
before copying and pasting the rest of the script without a problem.)
#! /usr/bin/Rscript
######
## Counting and excluding low read fastqs
# This script counts the number of reads in a fastq and moves those under a certain
# threshold to a separate file
## Collect arguments
args <- commandArgs(TRUE)
## Parse arguments (we expect the form --arg=value)
parseArgs <- function(x) strsplit(sub("^--", "", x), "=")
argsL <- as.list(as.character(as.data.frame(do.call("rbind", parseArgs(args)))$V2))
names(argsL) <- as.data.frame(do.call("rbind", parseArgs(args)))$V1
args <- argsL
rm(argsL)
if("--help" %in% args | is.null(args$files) | is.null(args$min_reads)) {
cat("
Counts the number of reads in the gzipped FASTQs in a folder, then moves
files below a certain number of reads to a folder called 'less-than-[min_reads]'
--files=string - Full path to folder with raw files
--min_reads=numeric - Minimum number of reads for a file to be included
--n_cores=numeric - Number of cores to use
Example:
Rscript count_and_move_raw_fastqs.R --files=raw/ \\ \n --min_reads=2e6 \\ \n --n_cores=10 \n\n")
q(save="no")
}
## Import arguments
raw.path <- args$files
min.reads <- args$min_reads
n.cores <- args$n_cores
# Import libraries
library(foreach)
library(doParallel)
library(tidyverse)
# set wd to raw.path
setwd(raw.path)
# Set up parallelization with n cores
my.cluster <- makePSOCKcluster(nnodes = n.cores, names = n.cores, outfile="")
registerDoParallel(cl = my.cluster)
# Create folder for low read files to go
output_dir <- paste("files-with-less-than", min.reads,"-reads", sep = "")
dir.create(output_dir)
# Create function to count the lines in the file, and move files < min.reads to a different directory
process_file <- function(file){
# Get the base name (without R1) for each file
file.name <- stringr::str_split_fixed(file, "_", n = 2)[,1]
# Count the reads
message(paste("Counting reads in ", file, " as of ", Sys.time(), sep = ""))
reads <- as.numeric(system(paste("gunzip -c ", file, " | sed -n '$=' | awk '{print $1/4}'", sep = ""), intern = TRUE))
# Move files if <= min.reads
if(reads <= min.reads){
# Find all files with the name
files.to.move <- list.files(pattern = file.name)
# Move them
purrr::map(files.to.move, function(x) file.rename(x, paste(output_dir, x, sep = "/")))
}
# Create final df of reads and names
output_txt <- file.path(getwd(), "reads_and_names_for_raw_files.txt")
cat(paste(file.name, reads, "\n", sep = "\t"), file = output_txt, append = TRUE)
}
# Find the files to count
files.to.count <- list.files(getwd(), pattern = "_R1_001.fastq.gz$")
# Loop over each file in parallel
foreach(file=files.to.count, .combine = c) %dopar%{
process_file(file)
}
stopCluster(my.cluster)
Since you are running on a Unix-alike, consider just using the parallel package's mclapply()
function:
#! /usr/bin/Rscript
######
## Counting and excluding low read fastqs
# This script counts the number of reads in a fastq and moves those under a certain
# threshold to a separate file
## Collect arguments
args <- commandArgs(TRUE)
## Parse arguments (we expect the form --arg=value)
parseArgs <- function(x) strsplit(sub("^--", "", x), "=")
argsL <- as.list(as.character(as.data.frame(do.call("rbind", parseArgs(args)))$V2))
names(argsL) <- as.data.frame(do.call("rbind", parseArgs(args)))$V1
args <- argsL
rm(argsL)
if("--help" %in% args | is.null(args$files) | is.null(args$min_reads)) {
cat("
Counts the number of reads in the gzipped FASTQs in a folder, then moves
files below a certain number of reads to a folder called 'less-than-[min_reads]'
--files=string - Full path to folder with raw files
--min_reads=numeric - Minimum number of reads for a file to be included
--n_cores=numeric - Number of cores to use
Example:
Rscript count_and_move_raw_fastqs.R --files=raw/ \\ \n --min_reads=2e6 \\ \n --n_cores=10 \n\n")
q(save="no")
}
## Import arguments
raw.path <- args$files
min.reads <- args$min_reads
n.cores <- args$n_cores
library(tidyverse)
# set wd to raw.path
setwd(raw.path)
# Create folder for low read files to go
output_dir <- paste("files-with-less-than", min.reads,"-reads", sep = "")
dir.create(output_dir)
# Create function to count the lines in the file, and move files < min.reads to a different directory
process_file <- function(file){
# Get the base name (without R1) for each file
file.name <- stringr::str_split_fixed(file, "_", n = 2)[,1]
# Count the reads
message(paste("Counting reads in ", file, " as of ", Sys.time(), sep = ""))
reads <- as.numeric(system(paste("gunzip -c ", file, " | sed -n '$=' | awk '{print $1/4}'", sep = ""), intern = TRUE))
# Move files if <= min.reads
if(reads <= min.reads){
# Find all files with the name
files.to.move <- list.files(pattern = file.name)
# Move them
purrr::map(files.to.move, function(x) file.rename(x, paste(output_dir, x, sep = "/")))
}
# Create final list of reads and names
paste(file.name, reads, "\n", sep = "\t")
}
# Find the files to count
files.to.count <- list.files(getwd(), pattern = "_R1_001.fastq.gz$")
# Process each file in parallel and output a list of reads and names
output_txt = parallel::mclapply(files.to.count, process_file, mc.cores = n.cores)
# Serial write of the list of reads and names
outfile <- file.path(getwd(), "reads_and_names_for_raw_files.txt")
lapply(output_txt, cat, file = outfile)
This will have less memory overhead, since the forked parallel R sessions share most of their memory space. A socket-connected collection of R sessions does not.
Also, instead of parallel writing with cat
, the code collects the lines of text in a list, which is written with a plain serial `lapply'. Parallel writing into one file is not recommended unless special care is taken that writers do not run into each other, resulting in a garbled text.
I did not run your code as I don't have the input files. Let me know how it works for you.