4

The simplest way I've found so far to use a parallel lapply in R was through the following example code:

library(parallel)
library(pbapply)

cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})

results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)

This has a very useful feature of providing a progress bar for the results, and is very easy to reuse the same code when no parallel computations are needed, by setting cl = NULL.

However, one issue that I've noted is that the pblapply is looping through the list in batches. For example, if one worker is stuck for a long time on a certain task, the remaining workers will wait for it to finish before starting a new batch of jobs. For certain tasks this adds a lot of unnecessary time to the workflow.

My question: Are there any similar parallel frameworks that would allow for the workers to run independently? Progress bar and the ability to reuse the code with cl=NULL would be a big plus.

Maybe it is possible to modify the existing code of pbapply to add this option/feature?

Waldi
  • 39,242
  • 6
  • 30
  • 78
runr
  • 1,142
  • 1
  • 9
  • 25

2 Answers2

6

(Disclaimer: I'm the author of the future framework and the progressr package)

A close solution that resembles base::lapply(), and your pbapply::pblapply() example, is to use the future.apply as:

library(future.apply)

## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)

xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
  Sys.sleep(0.1)
  sqrt(x)
})

Chunking: You can control the amount of chunking with argument future.chunk.size or supplementary future.schedule. To disable chunking such that each element is processed in a unique parallel task, use future.chunk.size=1. This way, if there is one element that takes much longer than other elements, it will not hold up any other elements.

xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
  Sys.sleep(0.1)
  sqrt(x)
}, future.chunk.size=1)

Progress updates in parallel: If you want to receive progress updates when doing parallel processing, you can use progressr package and configure it to use the progress package to report updates as a progress bar (here also with an ETA).

library(future.apply)
plan(multisession, workers=4)

library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))

with_progress({
  p <- progressor(along=xs)
  results <- future_lapply(xs, FUN=function(x) {
    p()  ## signal progress
    Sys.sleep(0.1)
    sqrt(x)
  }, future.chunk.size=1)
})

You can wrap this into a function, e.g.

my_fcn <- function(xs) {
  p <- progressor(along=xs)
  future_lapply(xs, FUN=function(x) {
    p()
    Sys.sleep(0.1)
    sqrt(x)
  }, future.chunk.size=1)
}

This way you can call it as a regular function:

> result <- my_fcn(xs)

and use plan() to control exactly how you want it to parallelize. This will not report on progress. To do that, you'll have to do:

> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------]   9%  1m

Run everything in the background: If your question was how to run the whole shebang in the background, see the 'Future Topologies' vignette. That's another level of parallelization but it's possible.

HenrikB
  • 6,132
  • 31
  • 34
  • This is awesome! All the controls that I was looking for. Great job with the framework! Still, for some reason, I'm not seeing the progress bar printed, must be something with my machine... – runr Jul 20 '20 at 10:15
  • The progress bar prints without problems on the linux machine though, but not on windows + microsoft R 3.5.1. Is this a known issue? – runr Jul 20 '20 at 10:26
  • "The progress bar prints without problems ... but not on windows + microsoft R 3.5.1 ...": @Nutle, are you referring to the **progressr** solution in my answer, or some other solution mention in this post? If my example code in the above answer, which uses **progressr**, does _not_ please let me know. – HenrikB Jul 20 '20 at 19:38
  • Yes, the progressr solution. Maybe some version mismatch (or other loaded packages?) - had to install from github, since the CRAN/MRAN version wasn't available for 3.5.1. I can also confirm that R 4.0.2 works too on the same windows machine, so only the Microsoft R seems to be affected. – runr Jul 20 '20 at 22:10
  • Oh, yeah you need future (>= 1.16.0) [2020-01-15] for near-live updates to work, so maybe that was the problem. – HenrikB Jul 21 '20 at 02:06
  • 1
    That will be the case, I'm working with a 2018-08 MRAN snapshot, where 1.9 was the latest. All updated now, thanks! – runr Jul 21 '20 at 10:46
2

You could use the furrr package which uses future to run purrr in multiprocess mode :

library(furrr)
plan(multisession, workers = nbrOfWorkers()-1)
nbrOfWorkers()
1:100 %>% future_map(~{Sys.sleep(1); rnorm(.x)},.progress = T)
Progress: ──────────────────────────────                                   100%

You can switch off parallel computations with plan(sequential)

Waldi
  • 39,242
  • 6
  • 30
  • 78
  • Thanks, the package looks promising. Any way to control the number of workers and functions/data passed to them with ``plan(multisession)``? Plus, the progress bar for some reason is not showing on my session, will have to investigate why. – runr Jul 15 '20 at 13:47
  • 1
    see my edit : the progress bar disappears after completion, I slowed this down. You can also set #workers – Waldi Jul 15 '20 at 13:54
  • Very good. To see the "time remaining" would also be useful, but this will have to do. Going to do some more testing and accept your answer eventually. Thanks! – runr Jul 15 '20 at 14:02