48

I love the setting .progress = 'text' in plyr's llply. However, it causes my much anxiety to not know how far along an mclapply (from package multicore) is since list items are sent to various cores and then collated at the end.

I've been outputting messages like *currently in sim_id # ....* but that's not very helpful because it doesn't give me an indicator of what percentage of list items are complete (although it is helpful to know that that my script isn't stuck and moving along).

Can someone suggest other ideas that would allow me to look at my .Rout file and get a sense of progress? I've thought about adding a manual counter but can't see how I would implement that since mclapply must finish processing all list items before it can give out any feedback.

Maiasaura
  • 32,226
  • 27
  • 104
  • 108
  • 1
    See my answer for a similar question: http://stackoverflow.com/a/5431265/653825 – otsaw Jun 12 '12 at 07:57
  • Excellent answer here by @fotNelton, and others based on it for reuse. As a quick solution to see progress in one-off `mclapply` calls, you can also just `cat(".")` in the worker function. – codeola Nov 26 '14 at 20:55
  • Excellent question, `package multicore` is no longer available, is there a workaround without package `multicore`? – forecaster Dec 29 '14 at 21:12
  • 1
    @forecaster: Yes, have a look at the `parallel` package. – fotNelton Dec 30 '14 at 06:13
  • I think you can adapt the following to your scenario: https://stackoverflow.com/a/73940644/5252007. – Mihai Oct 03 '22 at 20:52
  • If you don't mind `parallel::parSapply`, you can take a look at a package I wrote to track the progress of tasks executed in parallel. You can [find it here](https://parabar.mihaiconstantin.com/). – Mihai Feb 20 '23 at 05:22

6 Answers6

26

Due to the fact that mclapply spawns multiple processes, one might want to use fifos, pipes, or even sockets. Now consider the following example:

library(multicore)

finalResult <- local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
        # Child
        progress <- 0.0
        while (progress < 1 && !isIncomplete(f)) {
            msg <- readBin(f, "double")
            progress <- progress + as.numeric(msg)
            cat(sprintf("Progress: %.2f%%\n", progress * 100))
        } 
        exit()
    }
    numJobs <- 100
    result <- mclapply(1:numJobs, function(...) {
        # Dome something fancy here
        # ...
        # Send some progress update
        writeBin(1/numJobs, f)
        # Some arbitrary result
        sample(1000, 1)
    })
    close(f)
    result
})

cat("Done\n")

Here, a temporary file is used as fifo, and the main process forks a child whose only duty is to report the current progress. The main process continues by calling mclapply where the expression (more precisely, the expression block) that is to be evaluated writes partial progress information to the fifo buffer by means of writeBin.

As this is only a simple example, you'll probably have to adapt the whole output stuff to your needs. HTH!

fotNelton
  • 3,844
  • 2
  • 24
  • 35
  • Is this effectively any different from using standard functions `message` and `sink`? Messages from all child processes go to the same sink without delay, right? – otsaw Jun 13 '12 at 06:47
  • 3
    In case of `mclapply` the main process is waiting for all child processes to finish so without forking another child process there's no way to receive and process messages while `mclapply` is still working. – fotNelton Jun 13 '12 at 13:56
  • @fotNelton: Based on my experience, child processes seem to send stdout and stderr to the same as that of the parent process without any delay. But maybe this is OS-dependent? – otsaw Jun 13 '12 at 14:30
  • @otsaw: What you're saying about stdout resp. stderr is right but since the main process is blocked while waiting for `mclapply` to finish, another process or thread is needed to handle the progress output. – fotNelton Jun 13 '12 at 18:02
  • 1
    Hi forNelton, this is a very useful answer, package `multicore` is no longer available in CRAN. Is there a workaround without using package `multicore` Thank You – forecaster Dec 29 '14 at 21:01
  • @forecaster: Yes, have a look at the parallel package. – fotNelton Dec 30 '14 at 06:18
16

Essentially adding another version of @fotNelson's solution but with some modifications:

  • Drop in replacement for mclapply (supports all mclapply functions)
  • Catches ctrl-c calls and aborts gracefully
  • uses built in progress bar (txtProgressBar)
  • option to track progress or not and use a specified style of progress bar
  • uses parallel rather than multicore which has now been removed from CRAN
  • coerces X to list as per mclapply (so length(X) gives expected results)
  • roxygen2 style documentation at the top

Hope this helps someone...

library(parallel)

#-------------------------------------------------------------------------------
#' Wrapper around mclapply to track progress
#' 
#' Based on http://stackoverflow.com/questions/10984556
#' 
#' @param X         a vector (atomic or list) or an expressions vector. Other
#'                  objects (including classed objects) will be coerced by
#'                  ‘as.list’
#' @param FUN       the function to be applied to
#' @param ...       optional arguments to ‘FUN’
#' @param mc.preschedule see mclapply
#' @param mc.set.seed see mclapply
#' @param mc.silent see mclapply
#' @param mc.cores see mclapply
#' @param mc.cleanup see mclapply
#' @param mc.allow.recursive see mclapply
#' @param mc.progress track progress?
#' @param mc.style    style of progress bar (see txtProgressBar)
#'
#' @examples
#' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01))
#' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1)
#' 
#' dat <- lapply(1:10, function(x) rnorm(100)) 
#' func <- function(x, arg1) mean(x)/arg1 
#' mclapply2(dat, func, arg1=10, mc.cores=2)
#-------------------------------------------------------------------------------
mclapply2 <- function(X, FUN, ..., 
    mc.preschedule = TRUE, mc.set.seed = TRUE,
    mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
    mc.cleanup = TRUE, mc.allow.recursive = TRUE,
    mc.progress=TRUE, mc.style=3) 
{
    if (!is.vector(X) || is.object(X)) X <- as.list(X)

    if (mc.progress) {
        f <- fifo(tempfile(), open="w+b", blocking=T)
        p <- parallel:::mcfork()
        pb <- txtProgressBar(0, length(X), style=mc.style)
        setTxtProgressBar(pb, 0) 
        progress <- 0
        if (inherits(p, "masterProcess")) {
            while (progress < length(X)) {
                readBin(f, "double")
                progress <- progress + 1
                setTxtProgressBar(pb, progress) 
            }
            cat("\n")
            parallel:::mcexit()
        }
    }
    tryCatch({
        result <- mclapply(X, ..., function(...) {
                res <- FUN(...)
                if (mc.progress) writeBin(1, f)
                res
            }, 
            mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed,
            mc.silent = mc.silent, mc.cores = mc.cores,
            mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive
        )

    }, finally = {
        if (mc.progress) close(f)
    })
    result
}
waferthin
  • 1,582
  • 1
  • 16
  • 27
  • This version doesn't really show the progress of the task. The progress bar starts at 0% and remains there. – Ariel Nov 23 '14 at 21:22
  • This function works for me on OS X and linux, so maybe it's a windows issue. – waferthin Nov 26 '14 at 19:50
  • 1
    `fork` is not supported under Windows, so none of these should work, including `mclapply` itself (with more than one core). – codeola Nov 26 '14 at 20:47
  • I am on OSX and linux. `mclapply` shows warnings and errors when you specify just one core. Anything more, and it doesn't work. Can anyone else on OSX/linux confirm if this works on their systems? I am running it via RStudio. – Ariel Dec 04 '14 at 19:35
  • 1
    Looks like parallel:::mcfork is not working as expected in Rstudio. The resolution is beyond me and best handled as a separate question on stackoverflow. If I get a solution I'll post back here... – waferthin Dec 05 '14 at 10:32
  • 3
    This method for tracking progress won't work with Rstudio (see discussion here: http://stackoverflow.com/questions/27314011/), because output from the forked process (which prints progress to the screen) is ignored in Rstudio... – waferthin Dec 06 '14 at 11:06
13

The pbapply package has implemented this for the general case (i.e. on Unix-like and Windows, also works with RStudio). Both pblapply and pbsapply have a cl argument. From the documentation:

Parallel processing can be enabled through the cl argument. parLapply is called when cl is a ’cluster’ object, mclapply is called when cl is an integer. Showing the progress bar increases the communication overhead between the main process and nodes / child processes compared to the parallel equivalents of the functions without the progress bar. The functions fall back to their original equivalents when the progress bar is disabled (i.e. getOption("pboptions")$type == "none" dopb() is FALSE). This is the default when interactive() if FALSE (i.e. called from command line R script).

If one does not supply cl (or passes NULL) the default non-parallel lapply is used, also including a progress bar.

Axeman
  • 32,068
  • 8
  • 81
  • 94
  • Have you found a clean way to wrap a `tryCatch` around a `pblapply`/`mclapply` function so that it stops the cluster when it throws an exception? When I try stopping these parallel processes in RStudio, I end up with runaway cores that I have to manually kill via terminal. – philiporlando Jan 12 '19 at 01:59
  • 1
    @spacedSparking No, but I have been using the `furrr` package instead. – Axeman Jan 12 '19 at 07:52
7

Here's a function based on @fotNelton's solution to apply wherever you would normally use mcapply.

mcadply <- function(X, FUN, ...) {
  # Runs multicore lapply with progress indicator and transformation to
  # data.table output. Arguments mirror those passed to lapply.
  #
  # Args:
  # X:   Vector.
  # FUN: Function to apply to each value of X. Note this is transformed to 
  #      a data.frame return if necessary.
  # ...: Other arguments passed to mclapply.
  #
  # Returns:
  #   data.table stack of each mclapply return value
  #
  # Progress bar code based on https://stackoverflow.com/a/10993589
  require(multicore)
  require(plyr)
  require(data.table)

  local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
      # Child
      progress <- 0
      print.progress <- 0
      while (progress < 1 && !isIncomplete(f)) {
        msg <- readBin(f, "double")
        progress <- progress + as.numeric(msg)
        # Print every 1%
        if(progress >= print.progress + 0.01) {
          cat(sprintf("Progress: %.0f%%\n", progress * 100))
          print.progress <- floor(progress * 100) / 100
        }
      }
      exit()
    }

    newFun <- function(...) {
      writeBin(1 / length(X), f)
      return(as.data.frame(FUN(...)))
    }

    result <- as.data.table(rbind.fill(mclapply(X, newFun, ...)))
    close(f)
    cat("Done\n")
    return(result)
  })
}
Community
  • 1
  • 1
Max Ghenis
  • 14,783
  • 16
  • 84
  • 132
6

You can use your systems echo function to write from your workers, so simply add the following line to your function:

myfun <- function(x){
if(x %% 5 == 0) system(paste("echo 'now processing:",x,"'"))
dosomething(mydata[x])
}

result <- mclapply(1:10,myfun,mc.cores=5)
> now processing: 5 
> now processing: 10 

This will work if you pass an index e.g., so rather than passing a list of data, pass the index and extract the data in the worker function.

Nightwriter
  • 514
  • 5
  • 11
  • This is the smoother way to accomplish it for me, also because I can add additional details for each "loop". – Garini Sep 14 '20 at 13:37
2

Based on the answer of @fotNelson, using a progress bar instead of line by line printing and calling an external function with mclapply.

library('utils')
library('multicore')

prog.indic <- local({ #evaluates in local environment only
    f <- fifo(tempfile(), open="w+b", blocking=T) # open fifo connection
    assign(x='f',value=f,envir=.GlobalEnv)
    pb <- txtProgressBar(min=1, max=MC,style=3)

    if (inherits(fork(), "masterProcess")) { #progress tracker
        # Child
        progress <- 0.0
        while (progress < MC && !isIncomplete(f)){ 
            msg <- readBin(f, "double")
                progress <- progress + as.numeric(msg)

            # Updating the progress bar.
            setTxtProgressBar(pb,progress)
            } 


        exit()
        }
   MC <- 100
   result <- mclapply(1:MC, .mcfunc)

    cat('\n')
    assign(x='result',value=result,envir=.GlobalEnv)
    close(f)
    })

.mcfunc<-function(i,...){
        writeBin(1, f)
        return(i)
    }

Assigning the fifo connection to the .GlobalEnv is necessary to use it from a function outside of the mclapply call. Thanks for the questions and the previous replies, I had been wondering how to do this for a while.

Pepin_the_sleepy
  • 297
  • 1
  • 13