I am trying to perform a grid search to find coefficients that maximize the correlation between a linear combination of x's and y. My function takes a data.frame where each column is the thetas to test for that iteration.
corr_grid_search <- function(thetas, modeling_df) {
# thetas = as.vector(thetas)
coeff1 = modeling_df$penalty1 / thetas[1]
coeff2 = modeling_df$penalty2 / thetas[2]
coeff3 = modeling_df$penalty3 / thetas[3]
coeff4 = modeling_df$penalty4 / thetas[4]
coeff5 = modeling_df$penalty5 / thetas[5]
coeff6 = modeling_df$penalty6 / thetas[6]
coeff7 = modeling_df$penalty7 / thetas[7]
coeff8 = modeling_df$penalty8 / thetas[8]
coeff9 = modeling_df$penalty9 / thetas[9]
coeff10 = modeling_df$penalty10 / thetas[10]
df = data.frame(coeff1, coeff2, coeff3, coeff4, coeff5, coeff6, coeff7, coeff8, coeff9, coeff10)
pp_1 = modeling_df$x1 / df$coeff1
pp_2 = modeling_df$x2 / df$coeff2
pp_3 = modeling_df$x3 / df$coeff3
pp_4 = modeling_df$x4 / df$coeff4
pp_5 = modeling_df$x5 / df$coeff5
pp_6 = modeling_df$x6 / df$coeff6
pp_7 = modeling_df$x7 / df$coeff7
pp_8 = modeling_df$x8 / df$coeff8
pp_9 = modeling_df$x9 / df$coeff9
pp_10 = modeling_df$x10 / df$coeff10
recip = 1/df[, c('coeff1', 'coeff2', 'coeff3',
'coeff4', 'coeff5', 'coeff6',
'coeff7', 'coeff8', 'coeff9', 'coeff10')]
recip = as.data.frame(lapply(recip, function(x) replace(x, is.infinite(x), NA)))
df = data.frame(pp_1, pp_2, pp_3, pp_4, pp_5, pp_6, pp_7,
pp_8, pp_9, pp_10)
weighted_x = rowSums(df, na.rm=T) /
rowSums(recip, na.rm=T)
cor(weighted_x[!is.na(weighted_x)],
modeling_df[!is.na(weighted_x),]$y)
}
I have it running with lapply() like so:
lapply(blah, corr_grid_search, modeling_df)
But am trying to parallelize it and having trouble. The two methods I have tried use the parallel and future.apply libraries, but neither has worked:
library(future.apply)
plan(multisession)
cors = future_lapply(blah, corr_grid_search, modeling_df)
library(parallel)
cl = makeCluster(32)
clusterExport(cl=cl, varlist=c("modeling_df"))
cors = parLapply(cl, blah, corr_grid_search, modeling_df)
Something is going wrong with both of them because they take horrendously long, 2-3 orders of magnitude slower. What am I doing wrong here?
There are two frequent situations in my experience where parallel processing ends up slower than using a single-thread:
As the furrr
docs note:
It’s important to remember that data has to be passed back and forth between the workers. This means that whatever performance gain you might have gotten from your parallelization can be crushed by moving large amounts of data around. For example, if you are moving large data frames to the workers, running models in parallel, and returning large model objects back, the shuffling of data can take a large chunk of that time.
We can see an example of this if we define an inefficient function. This function calculates the mean of a data frame column, but then instead of returning this single value, creates a new data frame column with the value recycled. It then returns the entire data frame with this new column appended:
silly_fun <- function(dat, col_name) {
mean_col_name <- paste0(col_name, "_mean")
dat[[mean_col_name]] <- mean(dat[[col_name]])
return(dat)
}
It would be a bad idea to run this function over every column of a data frame, whether in parallel or not.
So, let's try it with only 100 rows and columns and see what happens:
library(future.apply)
plan(multisession)
nrows <- 100
ncols <- 100
dat <- data.frame(
matrix(rnorm(nrows * ncols), nrows, ncols)
)
res <- microbenchmark::microbenchmark(
single_thread = lapply(names(dat), \(col_name) silly_fun(dat, col_name)),
parallel = future_lapply(names(dat), \(col_name) silly_fun(dat, col_name)),
times = 100
)
Output:
Unit: milliseconds
expr min lq mean median uq max neval cld
single_thread 2.9771 3.26725 3.876938 3.43705 3.91215 9.6273 100 a
parallel 103.5295 114.23415 126.105709 123.41755 132.39925 235.1055 100 b
As you can see, the median for the parallel job is about 35 times that of the single-threaded.
If we try it with 1000 rows and columns, these are the results:
Unit: milliseconds
expr min lq mean median uq max neval
single_thread 168.5477 168.5477 168.5477 168.5477 168.5477 168.5477 1
parallel 29440.3962 29440.3962 29440.3962 29440.3962 29440.3962 29440.3962 1
Here it takes 175 times as long to run in parallel. You can see that I've only done one iteration here - a single-threaded iteration took 0.168 seconds but doing it in parallel took 29.4 seconds!
Let's take a more sensible example and just return the actual mean, rather than the entire data frame, with 1000 rows and columns. You might think that now we're not passing large amounts of data, parallel processing will be much quicker. Let's see:
nrows <- 1e3
ncols <- 1e3
dat <- data.frame(
matrix(rnorm(nrows * ncols), nrows, ncols)
)
sensible <- microbenchmark::microbenchmark(
single_thread = lapply(dat, mean),
parallel = future_lapply(dat, mean),
times = 10
)
Unit: milliseconds
expr min lq mean median uq max neval cld
single_thread 4.3159 4.65055 5.245647 4.88995 5.37955 10.3636 100 a
parallel 157.9709 163.17605 177.565840 169.55155 180.03720 513.5421 100 b
Both methods perform a lot better in absolute time (both cases with 1000 rows and columns look like their respective timings with 100 rows and columns using the silly method).
But it's still much faster to use a single thread, because of the high overhead of setting up the workers compared with the relatively quick operation of calculating a mean.
Overall the fact that a parallel job may not be quicker does not indicate that the code is not creating the sub-processes as intended. There are costs and benefits of parallel processing. It is a lot faster in the right situations, but if you're copying large datasets or creating processes to do very simple calculations, it can be slower than a single-threaded approach.