rhpcdoparallel

Utilizing multiple nodes for parallel computing in R


I have access to a HPC system. Let's say I have three nodes/system available. Details of each node is as follows:

scontrol show node

   Arch=x86_64 CoresPerSocket=10
   CPUAlloc=20 CPUTot=20 CPULoad=22.67
   AvailableFeatures=(null)
   ActiveFeatures=(null)
   Gres=(null)
   .
   .
   RealMemory=91000 AllocMem=0 FreeMem=77291 Sockets=2 Boards=1
   State=ALLOCATED ThreadsPerCore=1 TmpDisk=0 Weight=1 Owner=N/A MCS_label=N/A
   Partitions=cpu_normal_q
   BootTime=2023-10-20T12:56:13 SlurmdStartTime=2023-10-20T12:57:43
   CfgTRES=cpu=20,mem=91000M,billing=20
   AllocTRES=cpu=20
   CapWatts=n/a
   CurrentWatts=0 AveWatts=0
   ExtSensorsJoules=n/s ExtSensorsWatts=0 ExtSensorsTemp=n/s

I have a R-code which utilizes doParallel package in R and performs parallel computing.

library(doParallel)
library(Matrix)

# Set the number of cores to be used
num_cores <- detectCores()

# Initialize a parallel backend using doParallel
cl <- makeCluster(num_cores)

# Register the cluster for parallel processing
registerDoParallel(cl)

# Get the number of cores being utilized
cores_utilized <- getDoParWorkers()

# Function to perform matrix multiplication and inversion
matrix_mult_inv <- function() {

  # Generate random matrices
  mat <- matrix(rnorm(10000), nrow = 100)
  
  # Perform matrix multiplication
  result <- mat %*% mat
  
  # Compute the inverse of the result matrix
  inv_result <- solve(result)
  
  return(inv_result)
}

# Record the start time
start_time <- Sys.time()

# Perform the matrix multiplication and inversion in parallel
result <- foreach(i = 1:300, .combine = cbind) %dopar% {
  write.table(matrix_mult_inv(),paste("iteration_", i, ".txt", sep = ""))
}

# Record the end time
end_time <- Sys.time()

# Print the number of cores being utilized
print(paste("Number of cores being utilized:", cores_utilized))

# Print the time taken to run all the iterations:
print(paste("Time taken:", end_time - start_time))

# Stop the parallel backend
stopCluster(cl)

The code is designed to perform 300 iterations and within each iteration, matrix multiplication and inversion is done. The output is the total time taken to run 300 iterations and number of cores utilized.

My goal is to run this code in the HPC environment such that 20 cores from each system is utilized simultaneously so that I have 60 cores in total. Is it possible to do that?

I have looked into parSapply from snow package as well, but I think ultimately it depends on the makeCluster fucntion. I tried with

cl <- makeCluster(num_nodes, type = "SOCK", explicit = TRUE,
                  outfile = "", nodes = c(#3 specific node names input here#),
                  cpus = cores_per_node)

but this just utilized 3 cores in total.


Solution

  • Much of parallel computing development in R is aimed at laptops and at making Windows, Macs, and Unix look the same (by doing different things under the covers). This leads to confusion and often inefficiency when transitioning to a Unix cluster. The package pbdMPI was developed with clusters and their standard MPI practices specifically in mind. Here is a rewrite of your code with pbdMPI. Some explanations are commented in the code.

    The main thing to realize is that clusters are designed to run batch jobs and your script.R is a generalization of a serial code. Several instances of this identical code are started asynchronously by mpirun in your submission shell script. The codes differ only by their assigned rank. Most rank management is automated, even for some complex interactions between the ranks. Rscript R sessions are tasks in Slurm language and ranks in MPI language (usually!).

    library(pbdMPI)
    library(Matrix)
    
    # Function to perform matrix multiplication and inversion
    matrix_mult_inv = function() {
      
      # Generate random matrices
      mat = matrix(rnorm(10000), nrow = 100)
      
      # Perform matrix multiplication
      result = mat %*% mat
      
      # Compute the inverse of the result matrix
      inv_result = solve(result)
      
      return(inv_result)
    }
    
    n_mat = 300
    my_mats = comm.chunk(n_mat, form = "vector") # split the work
    
    size = comm.size()
    rank = comm.rank()
    comm.cat("Rank:", rank, "working on matrices:", my_mats, "\n", all.rank = TRUE)
    
    mat_list = list(length(my_mats))
    for(i in seq_along(my_mats)) {
      mat_list[[i]] = matrix_mult_inv()
      ## write.table(mat, paste("iteration_", my_mats[i], ".txt", sep = ""))
    }
    
    ## It is probably faster to combine to rank 0 in memory (if not too big)
    ## you can also combine to all ranks with allgather() - just as fast
    my_cbind_mats = do.call(cbind, mat_list)
    all_mats_r0 = gather(my_cbind_mats) # only rank 0 gets result, others get NULL
    if(rank == 0) {
      all_mats_r0 = do.call(cbind, all_mats_r0)
      cat("Total dim:", dim(all_mats_r0), "\n")
      ## Here you can do further computation with all_mats_r0
    }
    ## Or if you used allgather(), you'd have all_mats on all ranks (memory permitting)
    
    ## if you want to use parallel::mclapply() within ranks, you'll need
    num_cores = Sys.getenv("SLURM_CPUS_PER_TASK")
    comm.cat("my_cores:", num_cores, "\n", all.rank = TRUE)
    
    finalize() # required! for graceful exit
    

    To run this code on a Slurm cluster, save it in script.R, save the following script in script.sh and submit on a login node with sbatch script.sh.

    #!/bin/bash
    #SBATCH --job-name myjob
    #SBATCH --account=<your-account>
    #SBATCH --partition=<your-cpu-partition>
    #SBATCH --mem=64g
    #SBATCH --nodes=2
    #SBATCH --cpus-per-task=1
    #SBATCH --tasks-per-node=8
    #SBATCH --time 00:20:00
    #SBATCH -e ./myjob.e
    #SBATCH -o ./myjob.o
    
    pwd
    module load r  ## this may differ on your cluster
    module list
    
    time mpirun -np 16 Rscript script.R
    

    All of <your...> tokens above need to be changed to your cluster requirements. I set this up for 2 nodes, 8 R sessions on each node, and 1 core per session. So you are running 16 identical R codes, which decide what to do differently based on their rank. For example, comm.chunk() returns a different result to each rank, so that each rank works on different data.

    I've added a printing of a comm.print() to show how many Rscripts are running and which matrices they are handling. Some overprinting is still possible here as printing is handed off between several processes on a cluster.

    Timing of the code is best done in the shell script (prepending time to mpirun) because the ranks run asynchronously and any internal timing may not reflect the collective performance. The timing goes to error output in file myjob.e Your regular output goes to myjob.o. See the pbdMPI package documentation for more information on, for example, management of printing with comm.print() and comm.cat().

    I commented out your write.table() but it can be used as is because you have each rank writing to different files. On the other hand, you may do further parallel processing with the matrices before writing some results.

    All of these should be configured to your needs. The 1 core per session can be increased if you use some other multithreaded code inside each rank (such as mclapply() for shared-memory advantages). Just be aware of the total cores on a node and do not oversubscribe.

    When using MPI (via pbdMPI) and fork (via mclapply()) on 6 nodes of 20 cores each, you can have 120-way parallelism. MPI allows you to run from 6 ranks (with 20 cores available for mclapply() within each rank) to 120 ranks (with only 1 core available within each rank). This is because MPI instances can run on shared or distributed memory systems, but they never share memory.

    The advantage to letting mclapply() handle some parallelism is that it does share memory, providing a smaller memory footprint by not duplicating its data. There are some more nuances to this because MPI does parallel aggregations (like allreduce() and allgather()), whereas mclapply() does not. So if memory is not an issue, sometimes it is worthwhile to push MPI into the nodes.