rparallel-processingparallel-foreach

Nested for loops in R using foreach function and doParallel library


I am trying to calculate the cosine similarity between columns in a matrix. I am able to get it to work using standard for loops, but when I try to make it run in parallel to make the code run faster it does not give me the same answer. The problem is that I am unable to get the same answer using the foreach loop approach. I suspect that I am not using the correct syntax, because I have had single foreach loops work. I have tried to make the second loop a regular for loop and I used the %:% parameter with the foreach loop, but then the function doesn't even run.

Please see my attached code below. Thanks in advance for any help.

## Function that calculates cosine similarity using paralel functions.

#for calculating parallel processing
library(doParallel)

## Set up cluster on 8 cores

cl = makeCluster(8)

registerDoParallel(cl)

#create an example data
x=array(data=sample(1000*100), dim=c(1000, 100))

## Cosine similarity function using sequential for loops

cosine_seq =function (x) {

  co = array(0, c(ncol(x), ncol(x)))

  for (i in 2:ncol(x)) {
    for (j in 1:(i - 1)) {

      co[i, j] = crossprod(x[, i], x[, j])/sqrt(crossprod(x[, i]) * crossprod(x[, j]))
    }
  }

  co = co + t(co)

  diag(co) = 1

  return(as.matrix(co))

}

## Cosine similarity function using parallel for loops

cosine_par =function (x) {

  co = array(0, c(ncol(x), ncol(x)))

  foreach (i=2:ncol(x)) %dopar% {

    for (j in 1:(i - 1)) {

      co[i, j] = crossprod(x[, i], x[, j])/sqrt(crossprod(x[, i]) * crossprod(x[, j]))
    }
  }

  co = co + t(co)

  diag(co) = 1

  return(as.matrix(co))

}

## Calculate cosine similarity

tm_seq=system.time(
{

  x_cosine_seq=cosine_seq(x)

})

tm_par=system.time(
{

  x_cosine_par=cosine_par(x)

})

## Test equality of cosine similarity functions

all.equal(x_cosine_seq, x_cosine_par)

#stop cluster
stopCluster(cl)

Solution

  • The correct parallelization of nested loop uses %:% (read here).

    library(foreach)
    library(doParallel)
    registerDoParallel(detectCores())    
    cosine_par1 <- function (x) {  
      co <- foreach(i=1:ncol(x)) %:%
        foreach (j=1:ncol(x)) %dopar% {    
          co = crossprod(x[, i], x[, j])/sqrt(crossprod(x[, i]) * crossprod(x[, j]))
        }
      matrix(unlist(co), ncol=ncol(x))
    }
    

    I recommend you to write it in Rcpp, rather than running it in parallel, because foreach(i=2:n, .combine=cbind) will not always bind the columns in the right order. Also, in the above code I removed only the lower triangular condition, but the running time is considerably slower than the unparallelized code time.

    set.seed(186)
    x=array(data=sample(1000*100), dim=c(1000, 100))
    cseq <- cosine_seq(x)
    cpar <- cosine_par1(x)
     all.equal(cpar, cseq)
    #[1] TRUE
    head(cpar[,1])
    #[1] 1.0000000 0.7537411 0.7420011 0.7496145 0.7551984 0.7602620
    head(cseq[,1])
    #[1] 1.0000000 0.7537411 0.7420011 0.7496145 0.7551984 0.7602620
    

    Addendum: For this specific problem, (semi-) vectorization of cosine_seq is possible; cosine_vec is about 40-50 times faster than cosine_seq.

    cosine_vec <- function(x){
      crossprod(x) / sqrt(tcrossprod(apply(x, 2, crossprod)))
    }
    all.equal(cosine_vec(x), cosine_seq(x))
    #[1] TRUE
    library(microbenchmark)
    microbenchmark(cosine_vec(x), cosine_seq(x), times=20L, unit="relative")
    #Unit: relative
    #          expr      min       lq     mean   median       uq      max neval
    # cosine_vec(x)  1.00000  1.00000  1.00000  1.00000  1.00000  1.00000    20
    # cosine_seq(x) 55.81694 52.80404 50.36549 52.17623 49.56412 42.94437    20