rforeachparallel-foreach

R: using foreach to read csv data and apply functions over the data and export back to csv


I have 3 csv files, namely file1.csv, file2.csv and file3.csv.

Now for each of the file, I would like to import the csv and perform some functions over them and then export a transformed csv. So , 3 csv in and 3 transformed csv out. And there are just 3 independent tasks. So I thought I can try to use foreach %dopar%. Please not that I am using a Window machine.

However, I cannot get this to work.

library(foreach)
library(doParallel)
library(xts)
library(zoo)
numCores <- detectCores()
cl <- parallel::makeCluster(numCores)
doParallel::registerDoParallel(cl)

filenames <- c("file1.csv","file2.csv","file3.csv")
foreach(i = 1:3, .packages = c("xts","zoo")) %dopar%{
  df_xts          <- data_processing_IMPORT(filenames[i])
  ddates                <- unique(date(df_xts))
}

IF I comment out the last line ddates <- unique(date(df_xts)), the code runs fine with no error.

However, if I include the last line of code, I received the following error below, which I have no idea to get around. I tried to add .export = c("df_xts").

Error in { : task 1 failed - "unused argument (df_xts)"

It still doesn't work. I want to understand what's wrong with my logic and how should I get around this ? I am just trying to apply simple functions over the data only, I still haven't transformed the data and export them separately to csv. Yet I am already stuck.

The funny thing is I have written the simple code below, which works fine. Within the foreach, a is just like the df_xts above, being stored in a variable and passed into Fun2 to process. And the code below works fine. But above doesn't. I don't understand why.

numCores <- detectCores()
cl <- parallel::makeCluster(numCores)
doParallel::registerDoParallel(cl)


# Define the function
Fun1=function(x){
  a=2*x
  b=3*x
  c=a+b
  return(c)
}

Fun2=function(x){
  a=2*x
  b=3*x
  c=a+b
  return(c)
}

foreach(i = 1:10)%dopar%{
  x <- rnorm(5)
  a <- Fun1(x)
  tst <- Fun2(a)
  return(tst)
  }
### Output: No error

parallel::stopCluster(cl)

Update: I have found out that the issue is with the date function there to extract the number of dates within the csv file but I am not sure how to get around this.


Solution

  • The use of foreach() is correct. You are using date() in ddates <- unique(date(df_xts)) but this function returns the current system time as POSIX and does not require any arguments. Therefore the argument error is regarding the date() function.

    So i guess you want to use as.Date() instead or something similar.

    ddates <- unique(as.Date(df_xts))