rparallel-processingapplyrparallel

Does parApply() divide the matrix and then process each?


Suppose I have a parApply() call as follows:

cl <- makeCluster("FORK", 5)
parApply(cl = cl, X = my.mat, MARGIN = 1, FUN = myFun)

Where nrow(my.mat) is very big but myFun() is computed very fast. Note that the number of cores of cl is 5. I wonder how parallelization is carried out.

Is my.mat divided into 5 submatrices, then each processed by a thread, then combined together after all threads are done? Or is it done by sending elements of my.mat to each thread one by one?


Solution

  • Here are some explanation from the R documentation:

    parLapply, parSapply, and parApply are parallel versions of lapply, sapply and apply. Chunks of computation are statically allocated to nodes using clusterApply. By default, the number of chunks is the same as the number of nodes. parLapplyLB, parSapplyLB are load-balancing versions, intended for use when applying FUN to different elements of X takes quite variable amounts of time, and either the function is deterministic or reproducible results are not required. Chunks of computation are allocated dynamically to nodes using clusterApplyLB.

    Note that there are some changes done in R/3.5.0:

    From R 3.5.0, the default number of chunks is twice the number of nodes. Before R 3.5.0, the (fixed) number of chunks was the same as the number of nodes. As for clusterApplyLB, with load balancing the node that executes a particular job is non-deterministic and simulations that assign RNG streams to nodes will not be reproducible.

    clusterApply calls fun on the first node with arguments x[[1]] and ..., on the second node with x[[2]] and ..., and so on, recycling nodes as needed.

    There is clusterApplyLB that works slightly differently:

    clusterApplyLB is a load balancing version of clusterApply. If the length n of x is not greater than the number of nodes p, then a job is sent to n nodes. Otherwise the first p jobs are placed in order on the p nodes. When the first job completes, the next job is placed on the node that has become free; this continues until all jobs are complete. Using clusterApplyLB can result in better cluster utilization than using clusterApply, but increased communication can reduce performance. Furthermore, the node that executes a particular job is non-deterministic. This means that simulations that assign RNG streams to nodes will not be reproducible.

    So when you are using parApply your matrix is divided into 5 chunks. Each chunk is processed by one of the cores. In case of par*ApplyLB family of functions the elements are assigned to the cores one by one and once a core is finished with its task, another one is assigned to it.

    Here is an output from the following code:

    library(parallel)
    
    my.mat <- matrix(c(1:20,rep(0,20)), ncol=2)
    head(my.mat)
    #      [,1] [,2]
    # [1,]    1    0
    # [2,]    2    0
    # [3,]    3    0
    # [4,]    4    0
    # [5,]    5    0
    # [6,]    6    0
    
    cl <- makeCluster(5, "FORK")
    parApply(cl = cl, X = my.mat, MARGIN = 1, FUN = \(x) sprintf("sum= %d   pid=%d", sum(x), Sys.getpid()))
    # [1] "sum=1   pid=42569" 
    # [2] "sum=2   pid=42569" 
    # [3] "sum=3   pid=42569" 
    # [4] "sum=4   pid=42569" 
    # [5] "sum=5   pid=42570" 
    # [6] "sum=6   pid=42570" 
    # [7] "sum=7   pid=42570" 
    # [8] "sum=8   pid=42570" 
    # [9] "sum=9   pid=42571" 
    # [10] "sum=10   pid=42571"
    # [11] "sum=11   pid=42571"
    # [12] "sum=12   pid=42571"
    # [13] "sum=13   pid=42572"
    # [14] "sum=14   pid=42572"
    # [15] "sum=15   pid=42572"
    # [16] "sum=16   pid=42572"
    # [17] "sum=17   pid=42573"
    # [18] "sum=18   pid=42573"
    # [19] "sum=19   pid=42573"
    # [20] "sum=20   pid=42573"
    
    
    stopCluster(cl)
    

    Note the difference with the following output (see how pid values are distributed) if I use parLapplyLB with chunk size=1:

    mylist <- 1:20
    cl <- makeCluster(5, "FORK")
    parLapplyLB(cl = cl, X = mylist,\(x) sprintf("sum=%d   pid=%d", sum(x), Sys.getpid()), chunk.size = 1)
    # [[1]]
    # [1] "sum=1   pid=64019"
    # 
    # [[2]]
    # [1] "sum=2   pid=64020"
    # 
    # [[3]]
    # [1] "sum=3   pid=64021"
    # 
    # [[4]]
    # [1] "sum=4   pid=64022"
    # 
    # [[5]]
    # [1] "sum=5   pid=64023"
    # 
    # [[6]]
    # [1] "sum=6   pid=64019"
    # 
    # [[7]]
    # [1] "sum=7   pid=64020"
    # 
    # [[8]]
    # [1] "sum=8   pid=64019"
    # 
    # [[9]]
    # [1] "sum=9   pid=64020"
    # 
    # [[10]]
    # [1] "sum=10   pid=64019"
    # . . .
    stopCluster(cl)