The simplest way I've found so far to use a parallel lapply
in R was through the following example code:
library(parallel)
library(pbapply)
cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})
results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)
This has a very useful feature of providing a progress bar for the results, and is very easy to reuse the same code when no parallel computations are needed, by setting cl = NULL
.
However, one issue that I've noted is that the pblapply
is looping through the list in batches. For example, if one worker is stuck for a long time on a certain task, the remaining workers will wait for it to finish before starting a new batch of jobs. For certain tasks this adds a lot of unnecessary time to the workflow.
My question:
Are there any similar parallel frameworks that would allow for the workers to run independently? Progress bar and the ability to reuse the code with cl=NULL
would be a big plus.
Maybe it is possible to modify the existing code of pbapply
to add this option/feature?
(Disclaimer: I'm the author of the future framework and the progressr package)
A close solution that resembles base::lapply()
, and your pbapply::pblapply()
example, is to use the future.apply as:
library(future.apply)
## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
})
Chunking:
You can control the amount of chunking with argument future.chunk.size
or supplementary future.schedule
. To disable chunking such that each element is processed in a unique parallel task, use future.chunk.size=1
. This way, if there is one element that takes much longer than other elements, it will not hold up any other elements.
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
Progress updates in parallel: If you want to receive progress updates when doing parallel processing, you can use progressr package and configure it to use the progress package to report updates as a progress bar (here also with an ETA).
library(future.apply)
plan(multisession, workers=4)
library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))
with_progress({
p <- progressor(along=xs)
results <- future_lapply(xs, FUN=function(x) {
p() ## signal progress
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
})
You can wrap this into a function, e.g.
my_fcn <- function(xs) {
p <- progressor(along=xs)
future_lapply(xs, FUN=function(x) {
p()
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
}
This way you can call it as a regular function:
> result <- my_fcn(xs)
and use plan()
to control exactly how you want it to parallelize. This will not report on progress. To do that, you'll have to do:
> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------] 9% 1m
Run everything in the background: If your question was how to run the whole shebang in the background, see the 'Future Topologies' vignette. That's another level of parallelization but it's possible.