rforeachfuturer-future

Nested parallelism with R future


I'm trying to read multiple large csv files with nested parallelism with future.

I have a single machine with 32 cores, and I want to set up nested parallel (5 by 6) with outer 5 process with 6 cores each. I'm trying to utilize implicit parallelism from data.table::fread(.., nThreads = 6).

The R package future provides nested parallelism, and I've tried

library(future)
plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6)))

but above is actually using only 1 cores for each subprocess:

plan(list(tweak(multisession, workers = 5), 
          tweak(multisession, workers = 6)))
registerDoFuture()
foreach(i = 1:5) %dopar%  {
  availableCores()
}

[[1]]
mc.cores 
       1 

[[2]]
mc.cores 
       1 

[[3]]
mc.cores 
       1 

[[4]]
mc.cores 
       1 

[[5]]
mc.cores 
       1 

Is there a way to achieve this?


Solution

  • (Futureverse maintainer here)

    ... but above is actually using only 1 cores for each subprocess:

    I see the misunderstanding here. You want to use nbrOfWorkers() (from future) here instead of availableCores() (from parallelly - reexported as-is from future). This will give you what you'd expected:

    > foreach(i = 1:5) %dopar% {
      nbrOfWorkers()
    }
    [[1]]
    [1] 6
    ...
    [[5]]
    [1] 6
    

    The reason for availableCores() returning one (1) is because the future framework tries to prevent nested parallelization by mistake. It does this by setting options and environment variables that controls number of parallel workers and CPU cores, including options(mc.cores = 1L). This is correctly picked up by availableCores(). This prevents, for instance, a package that uses y <- mclapply(X, FUN), cl <- makeCluster(avaiableCores()), or plan(multisession) from running in parallel if already running in a parallel worker. In contrast, nbrOfWorkers() reflects the number of workers specified by plan(). In your case, we have plan(multisession, workers = 6) set in the parallel workers, from the second level in plan(list(tweak(multisession, workers = 5), tweak(multisession, workers = 6))).

    To convince yourself you're indeed are running in parallel with your setup, you can adopt one of the examples in https://future.futureverse.org/articles/future-3-topologies.html.

    Now, parallel threads are not the same as parallel processes (aka parallel workers). You can think of threads as a much lower-level parallelization mechanism. Importantly, the future framework does not constrain the number of threads used in parallel workers, including the number of parallel threads that data.table uses. Because of this, you need to explicitly call:

    data <- data.table::fread(.., nThreads = 6)
    

    or, if you want to be agile to the current settings,

    data <- data.table::fread(.., nThreads = nbrOfWorkers())
    

    to avoid over-parallelization. Alternatively, you can reconfigure data.table as:

    ## Set the number of parallel threads used by 'data.table'
    ## (the default is to use all physical CPU cores)
    data.table::setDTthreads(nbrOfWorkers())
    data <- data.table::fread(..;)
    

    BTW, in doFuture (>= 1.0.0), you longer need registerDoFuture() if you replace %dopar% with %dofuture%. So, the gist of reading lots of CSV files in parallel is:

    library(doFuture)
    plan(list(tweak(multisession, workers = 5), 
              tweak(multisession, workers = 6)))
    
    files <- dir(pattern = "*.csv$")
    res <- foreach(file = files) %dofuture% {
      data.table::setDTthreads(nbrOfWorkers())
      data.table::fread(file)
    }
    

    With all that said, note that your bottleneck will probably be the file system rather than the CPU. When you parallelize reading files, you might overwhelm the file system and end up slowing down the file reading rather than speeding it up. Sometimes it gets faster to read two-three files in parallel, but with more it becomes counterproductive. So, you need to benchmark with different number of parallel workers.

    Moreover, these days, there are R packages that are highly-specialized for reading data files into R efficiently. Some of them supports reading multiple files efficiently. The vroom package is one such example.