rdoparallelparallel-foreachstandardization

Efficiently standardize a large matrix in R with foreach/doParallel?


I need to standardize (subtract the mean and divide by the standard deviation) the columns of several large matrices in R (roughly 300,000 rows by 10,000-20,000 columns).

The process has been very slow, so I tried to speed it up with foreach/doParallel, but that doesn't seem to be helping.

Some example code with a smaller matrix:

library('doParallel')
n.cores <- 7
clust <- makeCluster(n.cores,type='FORK')
registerDoParallel(cl = clust)

rows <- 300000
cols <- 1500
big_matrix <- matrix(runif(rows*cols),nrow=rows,ncol=cols,dimnames=list(paste0('r',1:rows),paste0('c',1:cols)))

# Attempt 1: No parallelization. Took 52.3 seconds.
system.time( stand1 <- scale(big_matrix,center=T,scale=T) )

# Attempt 2: Each column separately with foreach. Took 168.9 seconds.
system.time(
  stand2 <- foreach(mchunk=big_matrix,.combine='cbind') %dopar% { #
              (mchunk - mean(mchunk))/sd(mchunk)
            }
)

# Attempt 3: In chunks with foreach. Took 52.7 seconds.
stand_chunks <- function(mat,n.cores) {
  chunk_indices <- split(1:ncol(mat),cut(1:ncol(mat),n.cores)) # A list of column indices, splitting the matrix into n.cores chunks
  stand_mat <- foreach(ci=chunk_indices,.combine='cbind') %dopar% {
    scale(mat[,ci],center=T,scale=T)
  }
  return(stand_mat)
}
system.time( stand3 <- stand_chunks(big_matrix,n.cores) )

# Attempt 4: In chunks with foreach, without copying whole matrix to each worker. 23 seconds.
stand_chunks_v2 <- function(mat,n.cores) {
  chunk_indices <- split(1:ncol(mat),cut(1:ncol(mat),n.cores)) # A list of column indices, splitting the matrix into n.cores chunks
  mchunks <- lapply(chunk_indices,function(ci) { return(mat[,ci]) })
  stand_mat <- foreach(mc=mchunks,.combine='cbind') %dopar% {
    scale(mc,center=T,scale=T)
  }
  return(stand_mat)
}
system.time( stand4 <- stand_chunks_v2(big_matrix,n.cores) )


parallel::stopCluster(cl=clust)

# Attempt 5: Do matrix algebra and let BLAS handle it. 20.5 seconds.
RhpcBLASctl::blas_set_num_threads(7)
stand_matalg <- function(x) {
  means <- rep(1, nrow(x)) %*% t(colMeans(x))
  sds <- rep(1, nrow(x)) %*% t(apply(x,2,sd))
  return((x - means)/sds)
}
system.time( stand5 <- stand_matalg(big_matrix) )
summary(c(abs(stand1 - stand5))) # Still looks correct, plus or minus a bit of floating-point weirdness

I can understand why Attempt 2 was slow - the overhead of setting up workers just to have them each perform one quick little operation outweighs the benefits of parallelization.

Attempt 3 was very memory inefficient - it has the issue described here as "giving workers more data than they need" - if I watch the threads using the "top" command I can see the memory usage of each thread going up as the entire big matrix is copied for each worker. By the time I increase the matrix to 5000 columns, this copying is memory-wasteful enough that it will run out of memory on a HPC compute node with 80 Gb RAM. It also wasn't any faster than doing it without parallelization, possibly due to the overhead of copying the large matrix for each worker.

Attempt 4 was, finally, faster. It's definitely more memory-efficient than Attempt 2 - it duplicates the large matrix when it makes the mchunks list, but at least duplication is better than copying it to 7 workers. But it's probably still possible to improve on this further.

Attempt 5 (adapted from this answer to an old question) is the fastest so far, but it still requires the creation of two large "means" and "sds" matrices that match the dimensions of the original large matrix.

What's a more efficient way to standardize these large matrices (with foreach or via some other method)?


Solution

  • Try fscale() from {collapse}:

    collapse::fscale(big_matrix)
    

    I expect it to be comparatively rapidly fast. I am not too familiar with it's arguments, there might be potential for further speed ups.