4

I'm trying to figure out how to pass functions and packages to the boot() function when running parallel computations. It seems very expensive to load a package or define functions inside a loop. The foreach() function that I often use for other parallel tasks has a .packages and .export arguments that handles this (see this SO question) in a nice way but I can't figure out how to do this with the boot package.

Below is a meaningless example that shows what happens when switching to parallel:

library(boot)
myMean <- function(x) mean(x)
meaninglessTest <- function(x, i){
  return(myMean(x[i]))
}

x <- runif(1000)

bootTest <- function(){
  out <- boot(data=x, statistic=meaninglessTest, R=10000, parallel="snow", ncpus=4)
  return(boot.ci(out, type="perc"))
}

bootTest()

Complains (as expected) about that it can't find myMean.

Sidenote: When running this example it runs slower than one-core, probably because splitting this simple task over multiple cores is more time consuming than the actual task. Why isn't the default to split into even job batches of R/ncpus - is there a reason why this isn't default behavior?

Update on the sidenote: As Steve Weston noted, the parLapply that boot() uses actually splits the job into even batches/chunks. The function is a neat wrapper for clusterApply:

docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, 
    fun, ...))

I'm a little surprised that this doesn't have a better performance when I scale up the the number of repetitions:

> library(boot)
> set.seed(10)
> x <- runif(1000)
> 
> Reps <- 10^4
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="no")
> Sys.time()-start_time
Time difference of 0.52335 secs
> 
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="snow", ncpus=4)
> Sys.time()-start_time
Time difference of 3.539357 secs
> 
> Reps <- 10^5
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="no")
> Sys.time()-start_time
Time difference of 5.749831 secs
> 
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="snow", ncpus=4)
> Sys.time()-start_time
Time difference of 23.06837 secs

I hope that this is only due to the very simple mean function and that more complex cases behave better. I must admit that I find it a little disturbing as the cluster initialization time should be the same in the 10.000 & 100.000 case, yet the absolute time difference increases and the 4-core version takes 5 times longer. I guess this must be an effect of the list merging, as I can't find any other explanation for it.

Community
  • 1
  • 1
Max Gordon
  • 5,367
  • 2
  • 44
  • 70
  • `boot` also allows you to specify a cluster with the `cl` argument. Creating a cluster and passing it in this way works for me. Would that be an option for you? – BenBarnes Jul 26 '13 at 11:51

1 Answers1

3

If the function to be executed in parallel (meaninglessTest in this case) has extra dependencies (such as myMean), the standard solution is to export those dependencies to the cluster via the clusterExport function. That requires creating a cluster object and passing it to boot via the "cl" argument:

library(boot)
library(parallel)
myMean <- function(x) mean(x)
meaninglessTest <- function(x, i){
  return(myMean(x[i]))
}
cl <- makePSOCKcluster(4)
clusterExport(cl, 'myMean')

x <- runif(1000)

bootTest <- function() {
  out <- boot(data=x, statistic=meaninglessTest, R=10000,
              parallel="snow", ncpus=4, cl=cl)
  return(boot.ci(out, type="perc"))
}

bootTest()
stopCluster(cl)

Note that once the cluster workers have been initialized, they can be used by boot many times and do not need to be reinitialized, so it isn't that expensive.

To load packages on the cluster workers, you can use clusterEvalQ:

clusterEvalQ(cl, library(randomForest))

That's nice and simple, but for more complex worker initialization, I usually create a "worker init" function and execute it via clusterCall which is perfect for executing a function once on each of the workers.

As for your side note, the performance is bad because the statistic function does so little work, as you say, but I'm not sure why you think that the work isn't being split evenly between the workers. The parLapply function is used to do the work in parallel in this case, and it does split the work evenly and rather efficiently, but that doesn't guarantee better performance than running sequentially using lapply. But perhaps I'm misunderstanding your question.

Steve Weston
  • 19,197
  • 4
  • 59
  • 75
  • Great answer, not as intuitive as foreach but it should do the work. I haven't looked at the boot parallel code but since the performance is so bad I assumed that it was doing one run of meaninglessTest for each cluster instead of doing 250 in each and then combining the output. When I've done simulations with foreach that approach has turned out to be faster, although maybe that was just a coincidence. – Max Gordon Jul 26 '13 at 16:40
  • @MaxGordon I think I understand you better now. I refer to that technique as "chunking", and it can greatly improve performance. `parLapply` does perform chunking, but even chunking has its limits. – Steve Weston Jul 26 '13 at 16:56
  • Thanks, I looked at the boot code and you're right about the chunking, I've updated the question with the details regarding this. It seems that the list merging is rather costly in these oversimplified examples, I'm a little surprised that there isn't a more efficient way to handle simple merges, but then as you rightly point out - anything that needs bootstrapping for won't be as simple as a mean(). – Max Gordon Jul 29 '13 at 10:56