rparallel-processinglarge-dataparallel.foreachvegan

Memory efficient parallel repeated rarefaction with subsequent matrix addition of large data set


I am trying to speed up repeatedly rarefying a data frame and the subsequent addition of generated matrices. Some background information: The data set I want to repeatedly rarefy is very large (about 200 rows and >> 100 000 columns). So far, I have tried the following:

i) Repeated rarefaction and storing rarefied data frames in a list for subsequent matrix addition. Unfortunately, memory becomes the limiting factor pretty rapidly if many repeated rarefactions are performed.

ii) I also parallelized the repeated rarefaction through foreach, wrote the rarefied data frames to disk and retrieved them latter for matrix addition. While the first part was pretty fast, the latter negated all speed benefits.

Sadly, the fastest solution is currently the non-parallelized one:

library(vegan)
data <- matrix(sample(1:20, 10000, replace = TRUE), ncol = 100)
min_sample <- min(rowSums(data)[rowSums(data) > 0])
for (i in 1:100) {
  if (i == 1) {
    suppressWarnings(dataRF1 <- rrarefy(data, min_sample))
  }
  else if (i > 2) {
    suppressWarnings(dataRF2 <- rrarefy(data, min_sample))
    dataRF1 <- dataRF1 + dataRF2
  }
}

I also tried foreach with .combine = "+". However it seems that the rarefied data frames are held in memory as well and summed the earliest after all rarefactions have taken place. Due to memory limitations, I need a solution that performs matrix addition immediately after the construction of individual rarefied data frames or a comparable memory-efficient and fast approach.

Thanks for your help.

Update: Thanks @SamR and the other one. In fact, from parallelizing the rarefactioning through foreach, I know that the speed gains through parallel processing are not completely negated by the overhead but by the import of files from disk and downstream matrix addition (see approach ii). I am well aware that through parallelizing through sockets, each worker runs in strictly isolated fashion and hence, the matrix addition with a shared starting matrix is not possible. However, I am thinking about providing such a starting matrix to each of the workers separately.

For instance, the processing is performed through foreach on three workers. Within each worker, the rarefied data frames are summed by matrix addition, resulting in a total of three matrices that are summed themselves after the parallel processing has finished. To do so, I would need to provide this starting matrix (a matrix with the same dimension as the input data frame) to each of the worker separately. Is this possible?


Solution

  • Based on your comments, we could rrarefy R times. Next, we transform the resulting list in a 3D-array, where we easily may apply a function, e.g. mean on MARGINs c(1, 2), referring to rows and columns, i.e. mean is calculated accross the 3rd dimension of Replications.

    > library(tictoc)  ## for timings
    > ncpu <- 7L
    > R <- 100
    > 
    > ## on Linux
    > tic()
    > rr <- parallel::mclapply(seq_len(R), \(i) rrarefy(data, min_sample), mc.cores=ncpu)
    > res <- unlist(rr) |> array(c(dim(data), R)) |> apply(MARGIN=1:2, mean)
    > res[1:5, 1:5]
          [,1]  [,2]  [,3]  [,4]  [,5]
    [1,]  5.90 10.07 16.14 11.56  4.88
    [2,]  7.11  9.66  4.28  4.33 17.40
    [3,]  6.35  4.89  6.49 14.83  2.48
    [4,] 13.01  3.42 11.26  2.46 16.15
    [5,] 16.20  5.19  8.90  3.38  3.38
    > toc()
    0.534 sec elapsed
    > 
    > ## on Windows
    > tic()
    > cl <- parallel::makeForkCluster(ncpu)
    > rr <- parallel::parLapply(cl, seq_len(R), \(i) rrarefy(data, min_sample))
    > res <- unlist(rr) |> array(c(dim(data), R)) |> apply(1:2, mean)
    > parallel::stopCluster(cl)
    > res[1:5, 1:5]
          [,1]  [,2]  [,3]  [,4]  [,5]
    [1,]  5.74 10.49 16.08 11.30  5.00
    [2,]  7.05  9.51  4.72  4.37 16.95
    [3,]  6.54  4.77  6.84 14.72  2.14
    [4,] 13.33  3.72 10.79  2.54 16.32
    [5,] 17.09  4.95  8.60  3.37  3.35
    > toc()
    0.66 sec elapsed
    

    Perhaps, if RAM is too weak, we need parallel::makePSOCKcluster.


    Data:

    > set.seed(42)
    > data <- matrix(sample(1:20, 10000, replace=TRUE), ncol=100)
    > min_sample <- min(rowSums(data)[rowSums(data) > 0])