In Windows (have yet to test in Linux), I'm trying to (embarrassingly) parallelize bayesian sampling on large populations. I'm running some tests and found some pretty concerning behavior in the treatment of a list where the list's objects are of different lengths. Libraries used are parallel
, snow
, doSNOW
, foreach
, and rlecuyer
.
## Set parameters
cores<-4; N<-10004; Mean<-rnorm(N,sd=0.7); SD<-rnorm(N,mean=1,sd=0.1)
## Split the population
lst<-suppressWarnings(split(1:N,f=1:cores))
## Initialize cluster
cl <<- parallel::makePSOCKcluster(cores)
parallel::clusterSetRNGStream(cl, iseed = round(runif(cores)*1001))
## Export and run test
clusterExport(cl,c("lst"))
system.time(
theta<-as.vector(parSapply(cl,1:cores,function(x) rnorm(length(lst[[x]]),mean=Mean[lst[[x]]],sd=SD)))
)
## validate length
system.time(
n.lst<-as.vector(parSapply(cl,1:cores,function(x) lst[[x]]))
)
## Stop the cluster and check data
parallel::stopCluster(cl)
length(theta) # 10004
length(n.lst) # 10004
Now I change the population to a number NOT divisible by 4
## Set parameters
cores<-4; N<-10001; Mean<-rnorm(N,sd=0.7); SD<-rnorm(N,mean=1,sd=0.1)
## Run the same code above... And check the output arrays:
length(theta) # 25010000
length(n.lst) # 25010000
So yes, the list has exponentially grown... to a length of an array that is NOT 2500+2500+2500+2501, but instead is 2500*2501*4... which makes no sense to me.
It turns out the problem must reside with the use of parSapply
; I had to convert the problem to use parLapply
and I finally got some acceptable results. When recoding the problem the following way, everything worked out. Note: I had to get away from split
as there was no obvious way to prevent it from running a modulus on the indices it splits. Instead, I created a function that splits a vector sequentially.
vsplit<-function(x,f) {
setNames(lapply(f,function (y) x[ceiling((1:length(x))/(length(x)/max(f)))==y]),f)
}
PNorm<-function(x) {
rnorm(length(lt[[x]]),lt[[x]],1)
}
settings<-list(); cores<-settings$cores<-4; N<-10000001
Mean<-rnorm(N,sd=0.7)
SD<-1
lt<-suppressWarnings(vsplit(Mean,1:cores))
cl <<- parallel::makePSOCKcluster(settings$cores)
parallel::clusterSetRNGStream(cl, iseed = round(runif(settings$cores)*1001))
clusterExport(cl,c("lt"))
system.time(
that<-unlist(parLapply(cl,1:cores,PNorm))
)
length(that)
parallel::stopCluster(cl)