rparallel-processingsnowsnowfall

When do I need to use sfExport (R Snowfall package)


I am using snowfall for parallel computing. I am always on only one machine with multiple CPUs (>20 cores). I am processing a large amount of data (>20gb). sfExport() takes very long.

When I run my test codes on my laptop and check the CPU usage, it sometimes also works without sfExport().

Some parts of my codes are nested sfLapply() functions. Like:

func2 <- function(c,d, ...) {      

  result <- 
    list(x = c+d,
         y = ..,
         ...
         )

  return(result)

}

func1 <- function(x, a, b, c, ...) {

  library(snowfall)
  d <- a+b

  result <- sfLapply(as.list(b$row), func2, c, d, ...)

  return(result)
}

result <- sfLapply(as.list(data.table$row), func1, a, b, c, ..)

When do I really need to export the data to all CPUs?

thanks and best regards Nico


Solution

  • If you're exporting a 20 gb object to all of the cluster workers, that will take a lot of time and use a lot of memory. Each worker will receive its own copy of that 20 gb object, so you may have to reduce the number of workers to reduce the total memory usage, otherwise your machine may start thrashing and your program may eventually die. In that case, using fewer workers may run much faster. Of course if your machine has 512 gb of RAM, using 20 workers may be fine, although it's still going to take a long time to send that object to all of the workers.

    If each worker needs a particular data frame or matrix in order to execute the worker function, then exporting it is probably the right thing to do. If each worker only needs part of the object, then you should split it up and only send the portion needed by each of the workers. The key is to determine exactly what data is needed by the worker function and only send what is needed.

    If it appears that an object is magically showing up on the workers even though you're not exporting it, you may be capturing that object in a function closure. Here's an example:

    library (snowfall)
    sfInit (parallel=TRUE , cpus=4)
    fun <- function() {
      x <- 100
      worker <- function(n) x * n
      sfLapply(1:1000, worker)
    }
    r <- fun()
    

    This works fine, but it's not obvious how the variable "x" is sent to the cluster workers. The answer is that "x" is serialized along with the "worker" function when sfLapply sends the tasks to the workers because "worker" is defined inside the function "fun". It's a waste of time to export "x" to the workers via sfExport in this case. Also note that although this technique works well with sfLapply, it doesn't work well with functions such as sfClusterApply and sfClusterApplyLB which don't perform task chunking like sfLapply, although that's only an issue if "x" is very large.

    I won't go into anymore detail on this subject except to say that you should be very careful when your worker function is defined inside another function.