(see working solution below)
I want to use multidplyr to parallelize a function :
calculs.R
f <- function(x){
return(x+1)
}
main.R
library(dplyr)
library(multidplyr)
source("calculs.R")
d <- data.frame(a=1:1000,b=sample(1:2,1000),replace=T)
result <- d %>%
partition(b) %>%
do(f(.)) %>%
collect()
I then get:
Initialising 3 core cluster.
Error in checkForRemoteErrors(lapply(cl, recvResult)) :
2 nodes produced errors; first error: could not find function "f"
In addition: Warning message:
group_indices_.grouped_df ignores extra arguments
How can I assign sourced functions to each core?
==================
Here is the flawless script:
Must extract the value to update, and turn the result into a dataframe
calcul.R
f <- function(x){
return(data.frame(x$a+1))
}
Must set the clusters and assign the sourced functions
main.R
library(dplyr)
library(multidplyr)
source("calculs.R")
cl <- create_cluster(3)
set_default_cluster(cl)
cluster_copy(cl, f)
d <- data.frame(a=1:10,b=c(rep(1,5),rep(2,5)))
result <- d %>%
partition(b) %>%
do(f(.)) %>%
collect()
It looks like you initialized a cluster (though you don't show this part). You need to export variables/function from your global environment to each worker. Assuming you made your cluster as
cl <- create_cluster(3)
set_default_cluster(cl)
Can you try
cluster_copy(cl, f)
This will copy-and-export f
to each worker (I think...)
Extra
You'll likely run into another problem which is that your function accepts x
as an argument, to which you add 1
f <- function(x){
return(x+1)
}
Since you're passing a data frame to f
, you are asking for data.frame+1
, which doesn't make sense. You might want to change your function to something like
f <- function(x){
return(x$a+1)
}