I'm currently developing an R package that will be using parallel computing to solve some tasks, through means of the "parallel" package.
I'm getting some really awkward behavior when utilizing clusters defined inside functions of my package, where the parLapply function assigns a job to a worker and waits for it to finish to assign a job to next worker. Or at least this is what appears to be happening, through the observation of the log file "cluster.log" and the list of running processes in the unix shell.
Below is a mockup version of the original function declared inside my package:
.parSolver <- function( varMatrix, var1 ) {
no_cores <- detectCores()
#Rows in varMatrix
rows <- 1:nrow(varMatrix[,])
# Split rows in n parts
n <- no_cores
parts <- split(rows, cut(rows, n))
# Initiate cluster
cl <- makePSOCKcluster(no_cores, methods = FALSE, outfile = "/home/cluster.log")
clusterEvalQ(cl, library(raster))
clusterExport(cl, "varMatrix", envir=environment())
clusterExport(cl, "var1", envir=environment())
rParts <- parLapply(cl = cl, X = 1:n, fun = function(x){
part <- rasterize(varMatrix[parts[[x]],], raster(var1), .....)
print(x)
return(part)
})
do.call(merge, rParts)
}
NOTES:
The weird part to me is if I execute the exact same code of the function parSolver in a global environment every thing works smoothly, all workers take one job at the same time and the task completes in no time. However if I do something like:
library(myPackage)
varMatrix <- (...)
var1 <- (...)
result <- parSolver(varMatrix, var1)
the described problem appears.
It appears to be a load balancing problem however that does not explain why it works ok in one situation and not in the other.
Am I missing something here? Thanks in advance.
I don't think parLapply
is running sequentially. More likely, it's just running inefficiently, making it appear to run sequentially.
I have a few suggestions to improve it:
parSolver
varMatrix
to each workerparSolver
The first point is important, because as your example now stands, all of the variables defined in parSolver
will be serialized along with the anonymous worker function and sent to the workers by parLapply
. By defining the worker function outside of any function, the serialization won't capture any unwanted variables.
The second point avoids unnecessary socket I/O and uses less memory, making the code more scalable.
Here's a fake, but self-contained example that is similar to yours that demonstrates my suggestions:
# Define worker function outside of any function to avoid
# serialization problems (such as unexpected variable capture)
workerfn <- function(mat, var1) {
library(raster)
mat * var1
}
parSolver <- function(cl, varMatrix, var1) {
parts <- splitIndices(nrow(varMatrix), length(cl))
varMatrixParts <- lapply(parts, function(i) varMatrix[i,,drop=FALSE])
rParts <- clusterApply(cl, varMatrixParts, workerfn, var1)
do.call(rbind, rParts)
}
library(parallel)
cl <- makePSOCKcluster(3)
r <- parSolver(cl, matrix(1:20, 10, 2), 2)
print(r)
Note that this takes advantage of the clusterApply
function to iterate over a list of row-chunks of varMatrix
so that the entire matrix doesn't need to be sent to everyone. It also avoids calls to clusterEvalQ
and clusterExport
, simplifying the code, as well as making it a bit more efficient.