rdata.tabledoparallelparallel-foreach

results from foreach loop in R


I have a function that I need to run on 2000 data frames. Each iteration is taking a very long time i.e almost 40 minutes and hence I'm using the 'foreach' package in R. I have generated the data in the following way:

library(foreach)
library(doParallel)
library(data.table)
library(matrixStats)
# DATA
datalist <- list()
for (i in 1:2000){
  set.seed(i)
  x1 <- rnorm(600,0.05,0.3)
  x2 <- rnorm(600,-2,0.25)
  data_2 <- data.frame(x1,x2)
  lin_pred <- 1+(0.2 * data_2[1]) + (1.2*data_2[2]) 
  prob <- 1/(1+exp(-lin_pred))
  index <- rep(1:100, each = 6)
  data <- data.frame(index,prob)
  data_split <- split(data,f=data$index)
  create_y <- function(p){
    
    y <- c()
    y_1 <- rbinom(1,1,p[1,2])
    y <- append(y,y_1)
    for(i in 2:6){
      pr <- p[i,2] + 0.05*(y[i-1]-p[i-1,2])
      u <- rbinom(1,1,pr)
      y <- append(y,u)
    }
    return(y)
  }
  
  res <- lapply(data_split,create_y)
  y <- data.frame(x=unlist(res))
  final_data <- data.frame(index,y,x1,x2)
  datalist[[i]] <- final_data
}

Now I have defined my objective function which will be optimized for each of the data set in the list defined above:

## Objective function:
dpd_tdependent <- function(x, fixed = c(rep(FALSE,5))){
  params <- fixed
  dpd <- function(p){
    params[!fixed] <- p
    alpha <- params[1]
    beta_0 <- params[2]
    beta_1 <- params[3]
    beta_2 <- params[4]
    rho <- params[5]
    add_pi <- function(d){
      k <- beta_0+(d[3]*beta_1)+(d[4]*beta_2)
      k1 <- exp(k)/(1+exp(k))
      p <- c()
      p <- append(p,k1[1,1])
      for(i in 2:6){
        u <- k1[i,1] + rho*(d[i-1,2]-k1[i-1,1])
        p <- append(p,u)
      }
      d <- cbind(d,p)
    }
    dat_split <- split(x , f  = x$index)
    result <- lapply(dat_split, add_pi)
    
    result <- rbindlist(result)
    result <- as.data.frame(result)
    colnames(result) <- c('index','y','x1','x2','exp_prob')
    result_split <- split(result, f = result$index)
    ## First expression
    full_prob <- function(d){
      k <- as.data.frame(expand.grid(c(0,1),c(0,1),c(0,1),c(0,1),c(0,1),c(0,1)))
      k <- as.data.frame(t(k))
      val <- c()
      for(j in 1:ncol(k)){
        d[2]<- k[j]
        m <- d[1,5]^d[1,2] * ((1-d[1,5])^(1-d[1,2]))
        for(i in 2:nrow(d)){
          c <- 1+ (rho*((d[i,2]-d[i,5])*(d[i-1,2]-d[i-1,5]))/(sqrt(abs(d[i,5]*d[i-1,5]*(1-d[i,5])*(1-d[i-1,5])))))
          m <- m* d[i,5]^d[i,2] * (1-d[i,5])^(1-d[i,2]) *c 
        }
        val <- append(val,m)
      }
      val <- val^(1+alpha)
      return(sum(val))
    }
    first_exp <- lapply(result_split,full_prob)
    first_exp <- as.vector(unlist(first_exp))
    
    ## Second expression:
    compute_prob <- function(d){
      m <- d[1,5]^d[1,2] * ((1-d[1,5])^(1-d[1,2]))
      for(i in 2:nrow(d)){
        c <- 1+ (rho*((d[i,2]-d[i,5])*(d[i-1,2]-d[i-1,5]))/(sqrt(abs(d[i,5]*d[i-1,5]*(1-d[i,5])*(1-d[i-1,5])))))
        m <- m* d[i,5]^d[i,2] * (1-d[i,5])^(1-d[i,2]) *c 
      }
      
      return(m^alpha)
    }
    second_exp <- lapply(result_split,compute_prob)
    second_exp <- as.vector(unlist(second_exp))
    
    final_res <- first_exp - ((1+1/alpha)*(second_exp))
    final_result <- sum(final_res)
    
  }
}

Lastly, I'm using the foreach package to hasten the process:

cl = makeCluster(6)
registerDoParallel(cl)




mse <- matrix(,nrow=2000,ncol=5)
foreach(i = 1:2000, .packages = c('data.table','matrixStats'), .export = c('mse','datalist'))%dopar%{
  beta <- rbind(1,0.2,1.2,0.05)
  val <- dpd_tdependent(datalist[[i]], c(0.7,FALSE,FALSE,FALSE,FALSE))
  b_s <- as.vector(optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho=0.001),val)$par)
  conv <- optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho = 0.001),val)$convergence
  k <- (b_s -beta)
  k <- append(k,conv)
  mse[i,] <- rbind(k)
  if(colCounts(mse , value = 0, na.rm = TRUE)[5] == 500) break
}
stopCluster(cl)

Now, my problem is that the matrix mse is not being populated with the desired values after running the optim function inside the foreach package. I am aware that I can use the .combine feature inside the foreach function to return a matrix, but how will I assign a name to such a matrix? I mean if I cannot assign a name then, how will I check the last condition(inside if) and break?

I truly appreciate any help in this regard. Thank you.


Solution

  • I don't believe you should be trying to modify a global variable from within each worker. See my comment above and link. You shouldn't be checking within iteration process if 500 iterations have convergence=0, because that information is not available to each iteration. The below is one option to return what you want

    cl = makeCluster(6)
    registerDoParallel(cl)
    
    mse = foreach(i = 1:2000, .packages = c('data.table','matrixStats')) %dopar%{
      beta <- rbind(1,0.2,1.2,0.05)
      val <- dpd_tdependent(datalist[[i]], c(0.7,FALSE,FALSE,FALSE,FALSE))
      optim_sol <- optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho=0.001),val)
      b_s <- optim_sol$par
      conv <- optim_sol$convergence
      c(b_s-beta,conv,i)
    }
    mse <- matrix(unlist(m),nrow=2000, byrow=T)
    
    stopCluster(cl)