rwindowsparallel-processingcluster-computingsnow

R Parallel Processing - Node Choice


I am attempting to process a large amount of data in R on Windows using the parallel package on a computer with 8 cores. I have a large data.frame that I need to process row-by-row. For each row, I can estimate how long it will take for that row to be processed and this can vary wildly from 10 seconds to 4 hours per row.

I don't want to run the entire program at once under the clusterApplyLB function (I know this is probably the most optimal method) because if it hits an error, then my entire set of results might be lost. My first attempt to run my program involved breaking it up into Blocks and then running each Block individually in parallel, saving the output from that parallel run and then moving on to the next Block.

The problem is that as it ran through the rows, rather than running at 7x "real" time (I have 8 cores, but I wanted to keep one spare), it only seems to be running at about 2x. I've guessed that this is because the allocation of rows to each core is inefficient.

For example, ten rows of data with 2 cores, two of the rows could run in 4 hours and the other two will take 10 seconds. Theoretically this could take 4 hours and 10 seconds to run but if allocated inefficiently, it could take 8 hours. (Obviously this is an exaggeration, but a similar situation can happen when estimates are incorrect with more cores and more rows)

If I estimate these times and submit them to the clusterApplyLB in what I estimate to be the correct order (to get the estimated times to be spread across cores to minimize time taken), they might not be sent to the cores that I want them to be, because they might not finish in the time that I estimate them to. For example, I estimate two processes to have times of 10 minutes and 12 minutes and they take 11.6 minutes and 11.4 seconds then the order that the rows are submitted to clusterApplyLB won't be what I anticipated. This kind of error might seem small, but if I have optimised multiple long-time rows, then this mix-up of order could cause two 4-hour rows to go to the same node rather than to different nodes (which could almost double my total time).

TL;DR. My question: Is there a way to tell an R parallel processing function (e.g. clusterApplyLB, clusterApply, parApply, or any sapply, lapply or foreach variants) which rows should be sent to which core/node? Even without the situation I find myself in, I think this would be a very useful and interesting thing to provide information on.


Solution

  • I would say there are 2 different possible solution approaches to your problem.

    The first one is a static optimization of the job-to-node mapping according to the expected per-job computation time. You would assign each job (i.e., row of your dataframe) a node before starting the calculation. Code for a possible implementation of this is given below.

    The second solution is dynamic and you would have to make your own load balancer based on the code given in clusterApplyLB. You would start out the same as in the first approach, but as soon as a job is done, you would have to recalculate the optimal job-to-node mapping. Depending on your problem, this may add significant overhead due to the constant re-optimization that takes place. I think that as long as you do not have a bias in your expected computation times, it's not necessary to go this way.

    Here the code for the first solution approach:

    library(parallel)
    #set seed for reproducible example
    set.seed(1234)
    #let's say you have 100 calculations (i.e., rows)
    #each of them takes between 0 and 1 second computation time
    expected_job_length=runif(100)
    #this is your data
    #real_job_length is unknown but we use it in the mock-up function below
    df=data.frame(job_id=seq_along(expected_job_length),
                  expected_job_length=expected_job_length,
                  #real_job_length=expected_job_length + some noise
                  real_job_length=expected_job_length+
                    runif(length(expected_job_length),-0.05,0.05))
    #we might have a negative real_job_length; fix that
    df=within(df,real_job_length[real_job_length<0]<-
                real_job_length[real_job_length<0]+0.05)
    #detectCores() gives in my case 4
    cluster_size=4
    

    Prepare the job-to-node mapping optimization:

    #x will give the node_id (between 1 and cluster_size) for each job
    total_time=function(x,expected_job_length) {
      #in the calculation below, x will be a vector of reals
      #we have to translate it into integers in order to use it as index vector
      x=as.integer(round(x))
      #return max of sum of node-binned expected job lengths
      max(sapply(split(expected_job_length,x),sum))
    }
    
    #now optimize the distribution of jobs amongst the nodes
    #Genetic algorithm might be better for the optimization
    #but Differential Evolution is good for now
    library(DEoptim)
    #pick large differential weighting factor (F) ...
    #... to get out of local minimas due to rounding
    res=DEoptim(fn=total_time,
                lower=rep(1,nrow(df)),
                upper=rep(cluster_size,nrow(df)),
                expected_job_length=expected_job_length,
                control=DEoptim.control(CR=0.85,F=1.5,trace=FALSE))
    #wait for a minute or two ...
    #inspect optimal solution
    time_per_node=sapply(split(expected_job_length,
                               unname(round(res$optim$bestmem))),sum)
    time_per_node
    #       1        2        3        4 
    #10.91765 10.94893 10.94069 10.94246
    plot(time_per_node,ylim=c(0,15))
    abline(h=max(time_per_node),lty=2)
    
    #add node-mapping to df
    df$node_id=unname(round(res$optim$bestmem))
    

    Now it's time for the calculation on the cluster:

    #start cluster
    workers=parallel::makeCluster(cluster_size)
    
    start_time=Sys.time()
    #distribute jobs according to optimal node-mapping
    clusterApply(workers,split(df,df$node_id),function(x) {
      for (i in seq_along(x$job_id)) {
        #use tryCatch to do the error handling for jobs that fail
        tryCatch({Sys.sleep(x[i,"real_job_length"])},
                 error=function(err) {print("Do your error handling")})
      }
    })
    end_time=Sys.time()
    
    #how long did it take
    end_time-start_time
    #Time difference of 11.12532 secs
    
    #add to plot
    abline(h=as.numeric(end_time-start_time),col="red",lty=2)
    
    stopCluster(workers)