I am currently fitting a set of models on a subset of data for each level of a factor variable. As the models take a long time to run, I use the foreach
and doParallel
package to estimate the set of models for each level of a variable in parallel using %dopar%. I only pass the subset of data to each worker to avoid memory issues, using isplit() function from the iterators
package.
Now, my question is how to extend my code so that in the first iteration, the models are estimated on the whole dataset, by passing the full dataset to one of the workers. In the next iterations then, I want to pass only a subset of the data to each worker and estimate the models.
I illustrate my problem using some example data of the mtcars
dataset below.
Suppose, I want to calculate the the average number of forward gears a car has (gear
column), by the number of cylinders of cars (cyl
column), in parallel.
First, load package and import the data
library(doParallel)
library(foreach)
library(iterators)
library(dplyr)
#get sample data to illustrate problem
data("mtcars")
df <- mtcars
df$cyl <- as.factor(df$cyl) #make cyl categorical
Next, iterate over each level of the cyl
column and do the necessary calculations
mycluster <- makeCluster(3)
registerDoParallel(mycluster)
result <- foreach(subset = isplit(df, df$cyl), .combine = "c", .packages = "dplyr") %dopar% {
x <- summarise(subset$value, mean(gear, na.rm = T))
return(x)
}
stopCluster(mycluster)
The result is a list containing the average number of gears for each category of number of cylinders.
> result
$`mean(gear, na.rm = T)`
[1] 4.090909
$`mean(gear, na.rm = T)`
[1] 3.857143
$`mean(gear, na.rm = T)`
[1] 3.285714
Now, what I want is to extend this code, so that I have four iterations. In the first iteration, I want to pass the full dataset to the first worker, and calculate the average number of gears for all cars included in the whole dataset. Next, I want to pass the specific subsets of data for each level of gear
to the other workers, and calculate the average number of gears, as shown above. So the new thing is just to add one iteration to the isplit() statement where I pass the full dataset.
Expected output:
> result
$`mean(gear, na.rm = T)` #average number of gears across all cars in dataset
[1] 3.6875
$`mean(gear, na.rm = T)`
[1] 4.090909
$`mean(gear, na.rm = T)`
[1] 3.857143
$`mean(gear, na.rm = T)`
[1] 3.285714
I know the example is silly, but it illustrates what I am trying to achieve. In reality, I use a very large dataset and estimate a couple of models that each take a long time to run.The data are however from a census, so I cannot share a few lines of it.
If you weren't using iterators, this would be as simple as passing subset = c(list(df), split(df, df$cyl))
. However, this syntax will not work with isplit()
. More importantly, a data frame iterator is basically a list with a $nextElem()
function that, when called, evaluates the relevant subset of the data. As you want to be thread safe, you want to ensure this happens in the expected order, by using just one iterator.
Let's write a function that creates an iterator for a new class, extra
. This will be based on iterators::isplit.data.frame()
. The difference, however, is that on the first iteration it will yield the entire data frame:
isplit.extra <- function(x, f, drop = FALSE, ...) {
first_iter <- TRUE
it <- isplit(seq_len(nrow(x)), f, drop = drop, ...)
nextEl <- function() {
# On first iteration return the entire data frame
if (first_iter) {
first_iter <<- FALSE
return(list(value = x, key = "all_data"))
}
# Otherwise split the data frame in the normal way
i <- nextElem(it)
list(value = x[i$value, , drop = FALSE], key = i$key)
}
structure(list(nextElem = nextEl), class = c("abstractiter", "iter"))
}
We can assign class extra
to your df
, and there's then no need for further changes as isplit()
will dispatch the appropriate method.
I have changed your summarise()
call to use the walrus operator to create a more useful column name. I've marked the lines I've changed.
class(df) <- c("extra", class(df)) # new
result <- foreach(
subset = isplit(df, df$cyl), # no need to change this
.combine = "c", .packages = "dplyr"
) %dopar% {
summarise(
subset$value,
"mean_gear_cyl_{subset$key}" := mean(gear, na.rm = T) # changed
)
}
The output is:
$mean_gear_cyl_all_data
[1] 3.6875
$mean_gear_cyl_4
[1] 4.090909
$mean_gear_cyl_6
[1] 3.857143
$mean_gear_cyl_8
[1] 3.285714