rparallel-processingdoparallel

How to optimize simple parallel processing with tasks taking different amounts of time to complete?


I have some R code using the foreach and doParallel to parallelize (?) an lapply() call, aka parLapply(). It is 100 tasks and I'm splitting across my laptop's 4 cores. The function automatically partitions the tasks in sets of 25 with 1:25, 26:50, 51:75, 76:100 as the splits across the CPU. (I know because I've saved files using integer index as list to iterate over.) The tasks much simpler for tasks 76:100 and as such they were completed very quickly. Meanwhile, all the other tasks are still queued.

Whenever I run some more involved code like this, I will periodically monitor the progress and check out the task manager. Before, I've noticed where the processes spin up and down in between executions. This time, I saw that there is now one "core" process that is persistently/perpetually/constantly/always(now) inactive or idle, CPU 0% in task manager.

screengrab of task manager

I assume this one was dedicated to the task list that was more quickly completed. There are 3 others that are active as usual processing away.

So my question: is there any way that I can modify the parallel coding so that tasks are more evenly distributed? Or a way to re-incorporate the idle core/process?

Similar to what is discussed in this post.

#not executed so not sure if actually reprex, but the concept is there
#computer is still running my other code :)
library(foreach)
library(doParallel)

cl <- makeCluster(mc <- getOption("cl.cores", parallel::detectCores()))

clusterExport(cl=cl, varlist=c("reprex"))
clusterEvalQ(cl, {library(dplyr)})

registerDoParallel(cl)

reprex2 <- parLapply(cl, 
                     1:100, 
                     function(x){
                       if(x<=75){
                         Sys.sleep(10)
                       }else{
                         print("Side question: any way for this to be seen? Or maybe some kind of progress bar baked into the code? I've taken to saving along the way to monitor.")
                       }
                     })

stopCluster(cl)

A lo-fi but maybe tedious solution I thought of would be to intuit or benchmark each "type" of task then organize/partition somewhat manually. But, I'm more looking for something on the code side for a less occupied / fatigued processor to deal with. :)

Bonus bonus bonus question(s) that are likely googleable: is each execution in parallel done with a clean environment? application-wise, should I front load memory intensive object when each core is established? or is it better to load a subset each time? Why do lists of lists take up more memory than a dataframe with the same content?


Solution

  • For load balanced parallel lapply a LB version of parLapply calles parLapplyLB() exists in parallel. This won't preschedule, but assign a new job to each node whenever it becomes free. This will work better for heterogenous tasks, but comes with additional overhead and might thus slow down homogenous tasks.

    For foreach / %dopar% we can explicitly turn off prescheduling by passing options.multicore / options.snow in the foreach call. However it looks like prescheduling is disabled here by default, and has to be turned on explicitly if desired.

    The following script demonstrates this:

    library(parallel)
    library(doParallel)
    library(foreach)
    library(ggplot2)
    
    cl <- makeCluster(2) # Create Cluster
    clusterApply(cl, seq_along(cl), function(i) workerID <<- i) # Export global Worker ID for later use
    registerDoParallel(cl)  # register cluster for foreach use
    
    #num_reps <- rep(c(10,2),4)
    num_reps <- rep(c(10,2),each=4)
    
    # The functions are just dummy functions that wait for varying amount of times, simulating heterogenous tasks
    times_no_opts <- foreach(i=num_reps) %dopar%
      sapply(seq_len(i), function(x) {
        Sys.sleep(0.1)
        c(id=workerID,time=format(Sys.time(),"%H:%M:%OS2"))
      })
    
    
    times_exp_preschedule <- foreach(i=num_reps,
                                   .options.multicore=list(preschedule=TRUE),
                                   .options.snow=list(preschedule=TRUE)) %dopar%
      sapply(seq_len(i), function(x) {
        Sys.sleep(0.1)
        c(id=workerID,time=format(Sys.time(),"%H:%M:%OS2"))
      })
    
    times_exp_no_preschedule <- foreach(i=num_reps,
                                    .options.multicore=list(preschedule=FALSE),
                                    .options.snow=list(preschedule=TRUE)) %dopar%
      sapply(seq_len(i), function(x) {
        Sys.sleep(0.1)
        c(id=workerID,time=format(Sys.time(),"%H:%M:%OS2"))
      })
    
    times_no_opts <- data.frame(index=as.character(rep(1:length(num_reps),
                                                sapply(times_no_opts,ncol))),
                                      clusterID = unlist(sapply(times_no_opts,`[`,1,)),
                                      time=as.numeric(stringr::str_extract(unlist(sapply(times_no_opts,`[`,2,)),
                                                                           "[0-9\\.]+$")))
    
    
    
    
    times_exp_no_preschedule <- data.frame(index=as.character(rep(1:length(num_reps),
                                                             sapply(times_exp_no_preschedule,ncol))),
                                      clusterID = unlist(sapply(times_exp_no_preschedule,`[`,1,)),
                                      time=as.numeric(stringr::str_extract(unlist(sapply(times_exp_no_preschedule,`[`,2,)),
                                                                           "[0-9\\.]+$")))
    
    times_exp_preschedule <- data.frame(index=as.character(rep(1:length(num_reps),
                                                                  sapply(times_exp_preschedule,ncol))),
                                           clusterID = unlist(sapply(times_exp_preschedule,`[`,1,)),
                                           time=as.numeric(stringr::str_extract(unlist(sapply(times_exp_preschedule,`[`,2,)),
                                                                                "[0-9\\.]+$")))
    
    
    
    ggplot(data=times_no_opts,
           aes(x=time,fill=clusterID,group=clusterID)) + 
      geom_histogram(binwidth=0.1,colour="black") +
      labs(title="No Opts")
    
    ggplot(data=times_exp_no_preschedule,
           aes(x=time,fill=clusterID,group=clusterID)) + 
      geom_histogram(binwidth=0.1,colour="black") +
      labs(title="Explicit No Preschedule")
    
    ggplot(data=times_exp_preschedule,
           aes(x=time,fill=clusterID,group=clusterID)) + 
      geom_histogram(binwidth=0.1,colour="black") + 
      labs(title="Explicit Preschedule")
    

    enter image description here

    enter image description here

    enter image description here