rparallel-processingmultiprocessinglapplyfurrr

R: asynchronous parallel lapply


The simplest way I've found so far to use a parallel lapply in R was through the following example code:

library(parallel)
library(pbapply)

cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})

results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)

This has a very useful feature of providing a progress bar for the results, and is very easy to reuse the same code when no parallel computations are needed, by setting cl = NULL.

However, one issue that I've noted is that the pblapply is looping through the list in batches. For example, if one worker is stuck for a long time on a certain task, the remaining workers will wait for it to finish before starting a new batch of jobs. For certain tasks this adds a lot of unnecessary time to the workflow.

My question: Are there any similar parallel frameworks that would allow for the workers to run independently? Progress bar and the ability to reuse the code with cl=NULL would be a big plus.

Maybe it is possible to modify the existing code of pbapply to add this option/feature?


Solution

  • (Disclaimer: I'm the author of the future framework and the progressr package)

    A close solution that resembles base::lapply(), and your pbapply::pblapply() example, is to use the future.apply as:

    library(future.apply)
    
    ## The below is same as plan(multisession, workers=4)
    cl <- parallel::makeCluster(4)
    plan(cluster, workers=cl)
    
    xs <- 1:100
    results <- future_lapply(xs, FUN=function(x) {
      Sys.sleep(0.1)
      sqrt(x)
    })
    

    Chunking: You can control the amount of chunking with argument future.chunk.size or supplementary future.schedule. To disable chunking such that each element is processed in a unique parallel task, use future.chunk.size=1. This way, if there is one element that takes much longer than other elements, it will not hold up any other elements.

    xs <- 1:100
    results <- future_lapply(xs, FUN=function(x) {
      Sys.sleep(0.1)
      sqrt(x)
    }, future.chunk.size=1)
    

    Progress updates in parallel: If you want to receive progress updates when doing parallel processing, you can use progressr package and configure it to use the progress package to report updates as a progress bar (here also with an ETA).

    library(future.apply)
    plan(multisession, workers=4)
    
    library(progressr)
    handlers(handler_progress(format="[:bar] :percent :eta :message"))
    
    with_progress({
      p <- progressor(along=xs)
      results <- future_lapply(xs, FUN=function(x) {
        p()  ## signal progress
        Sys.sleep(0.1)
        sqrt(x)
      }, future.chunk.size=1)
    })
    

    You can wrap this into a function, e.g.

    my_fcn <- function(xs) {
      p <- progressor(along=xs)
      future_lapply(xs, FUN=function(x) {
        p()
        Sys.sleep(0.1)
        sqrt(x)
      }, future.chunk.size=1)
    }
    

    This way you can call it as a regular function:

    > result <- my_fcn(xs)
    

    and use plan() to control exactly how you want it to parallelize. This will not report on progress. To do that, you'll have to do:

    > with_progress(result <- my_fcn(xs))
    [====>-----------------------------------------------------]   9%  1m
    

    Run everything in the background: If your question was how to run the whole shebang in the background, see the 'Future Topologies' vignette. That's another level of parallelization but it's possible.