rforeachparallel-processingbrms

Is there a way to export a function used within a BRMS model when parallelising posterior_epred with foreach()?


I am predicting a BRMS categorical model on a very large set of new data. When I try to predict with the whole dataset, R crashes or it exhausts the vector memory. To get around that, I want to loop through my new data and speed it up using foreach(). However, I can't seem to export a function I used to scale the data in my original model, even if I redefine the function in the foreach() loop. I'm not sure I totally understand how the function within my BRMS formula is stored in the model, which might be why I'm struggling to export it.

Here's basically what my code looks like:

test_data <- data.frame(category=c(rep("a",10),
                                   rep("b",10),
                                   rep("c",10)),
                        var1 = runif(30,min=0,max=20))

scale01 <- function(x) (x-min(x))/(max(x)-min(x))

test_brm <- brm(category ~ scale01(var1),
                family=categorical(refcat="a"),
                data=test_data)

prediction_data <- data.frame(id=rep(c(1:500),2),
                              var1=runif(1000,min=0,max=20))

cl <- makeCluster(parallel::detectCores())
registerDoSNOW(cl)

preds <- foreach(id = prediction_data$id,.combine=rbind,
                 packages=c("brms","matrixStats")) %dopar% {
                   
                   scale01 <- function(x) (x-min(x))/(max(x)-min(x))
                   
                   colMeans(posterior_epred(test_brm,
                                            newdata=prediction_data[which(prediction_data$id==id),]))
                   
                 }

And the error I get is Error in { : task 1 failed - "could not find function "scale01""

I'm aware that the scaling function I use relies on having the same max and min each time, but my newdata does (perhaps could hard code that but don't think it's the problem here).

Any suggestions? Thanks!


Solution

  • The problem is that scale01() is no longer associated with the formula category ~ scale01(var1) when the formula is exported to the parallel worker. This has to do with both of them living the global environment, which is never exported to parallel workers. To work around that, you can make scale01() part of the same local environment as the brmsfit object, as show below.

    library(foreach)
    library(doParallel)
    library(brms)
    
    test_data <- data.frame(category=c(rep("a",10),
                                       rep("b",10),
                                       rep("c",10)),
                            var1 = runif(30,min=0,max=20))
    
    test_brm <- local({
      scale01 <- function(x) (x-min(x))/(max(x)-min(x))
      
      brm(category ~ scale01(var1),
          family = categorical(refcat="a"),
          data = test_data)
    })                 
    
    prediction_data <- data.frame(
      id = rep(c(1:500), times = 2),
      var1 = runif(1000, min = 0, max = 20)
    )
    
    cl <- makeCluster(parallelly::availableCores())
    registerDoParallel(cl)
    
    preds <- foreach(id = prediction_data$id, .combine = rbind, .packages = c("brms")) %dopar% {
      data <- prediction_data[which(prediction_data$id == id), ]
      colMeans(posterior_epred(test_brm, newdata=data))
    }
    
    parallel::stopCluster(cl)
    

    Here I replaced doSNOW with doParallel. You do not want to rely on doSNOW, because that depends on the snow package, which is considered a legacy package right now. The parallel package, part of all R installation, has replaced the role of snow since ~2014.

    Also, importantly, do not default to detectCores(). It can wreak havoc on shared computers. Instead, use parallelly::availableCores(). For details, see https://www.jottr.org/2022/12/05/avoid-detectcores/.

    Now, as the author of Futureverse, I should also point out the following alternative code that achieves the same, while automating a lot of the details for you. It'll also support printing output on parallel workers, proper error handling, etc., which will all help you if you run into other problems.

    library(foreach)
    library(doFuture)
    library(brms)
    
    test_data <- data.frame(category=c(rep("a",10),
                                       rep("b",10),
                                       rep("c",10)),
                            var1 = runif(30,min=0,max=20))
    
    test_brm <- local({
      scale01 <- function(x) (x-min(x))/(max(x)-min(x))
      
      brm(category ~ scale01(var1),
          family = categorical(refcat="a"),
          data = test_data)
    })                 
    
    prediction_data <- data.frame(
      id = rep(c(1:500), times = 2),
      var1 = runif(1000, min = 0, max = 20)
    )
    
    plan(multisession)
    
    preds <- foreach(id = prediction_data$id, .combine = rbind) %dofuture% {
      data <- prediction_data[which(prediction_data$id == id), ]
      colMeans(posterior_epred(test_brm, newdata=data))
    }
    
    plan(sequential)
    

    BTW, instead of subsetting data within each iteration, you probably want to set up an "iterator" (see iterators package) to do that subsetting in the main R session. That would avoid sending over the whole prediction_data object to each parallel worker, which can be expensive, particularly for data sets.

    UPDATE: As an alternative to foreach and iterators, you can achieve something similar automatically by using future.apply::future_by(). If you try that, I recommend by first figuring out how to do it sequentially using by() that is part of base R. When that works, future_by() should work the same.