4

I would like to parallelize the while loop in the following code:

work <- function(n) {
  # Do some intensive work (e.g explore a graph starting at n).
  # After this, we don't need to execute work() on nodes in excluding.
  # (e.g exclude could be the nodes explored/reached from n)
  # n is just an example. exclude can be a potentially large set.
  Sys.sleep(2)
  exclude <- c(n, sample(nodes, rbinom(1, length(nodes), 0.5)))
  return(exclude)
}

nodes <- 1:1e3

#Order of execution doesn't matter
nodes <- sample(nodes)

#parallelize this loop
while(length(nodes) > 0) {
  n <- nodes[1]
  exclude <- work(n)
  nodes <- setdiff(nodes, exclude)
}

It doesn't matter if work() is executed on an excluded node, but we would like to minimize such instances. The objective of the while loop above is to run work() as few times as possible

This is not an embarrassingly parallel computation, so I don't know how to use parLapply directly. A master-slave framework could be used, but I don't know of any for multicore programming (on Windows).

As a concrete example, you can think of work(n) as graph_exploration(n) (the function the finds all nodes connected to n) and exclude as the nodes in the connected component of n. The final objective is to find one node from each connected component. You want to run graph_exploration(n) as few times as necessary because it is an expensive operation.

Miheer
  • 99
  • 5
  • 4
    Whatever you want to know and could learn here is likely being explained _thoroughly_ in the vignette to package `parallel` that came with your installation of R. – Dirk Eddelbuettel Nov 13 '17 at 03:56
  • By using `sample(nodes)` you wanted to do sampling with replacement instead? You expect to have many values that are the same on which you have to compute your function? – F. Privé Nov 13 '17 at 11:53
  • @F.Privé, I used a random reordering to indicate that the order of execution doesn't matter. All the values in `nodes` are distinct. We want to run `work()` as few times as possible. – Miheer Nov 13 '17 at 16:59
  • My suggestion is to break up your problem. You need to find 1) which `n <- nodes[1]` to perform intensive work on, and then 2) perform intensive work on each `n <- nodes[1]`. Therefore, use a while loop to find `n <- nodes[1]` and store those indices in a new list. The assumption is that finding `exclude` is trivial *and* can be separated from the intensive operation. Then use `parLapply` to perform intensive work on entries in the new list. – CPak Nov 13 '17 at 17:05
  • Btw, your while loop doesn't make a lot of sense to me. Your stop condition is when `nodes` is empty, which means you expect to touch each element in `nodes`. If so, using an `lapply` loop makes more sense. – CPak Nov 13 '17 at 17:08
  • @CPak, currently the only way to do (1) is by running `work`, so it's the chicken and egg problem. As an example you can think of `work()` as `graph_exploration(n)` and `exclude` as the nodes in the connected component of `n`. The objective is to find one node in each connected component. – Miheer Nov 13 '17 at 17:19
  • Since it seems like each iteration depends on the output of the previous (sequential operation), I think you're stuck with a while or recursive loop. Depending on how your graphs are connected and how many cores you have, you might still see a speedup with parallel (passing every element in `nodes` to parLapply), but you'll have to benchmark that yourself and the outcome will be graph-dependent. – CPak Nov 13 '17 at 17:27
  • @DirkEddelbuettel, could you point out parts from the `parallel` vignette that would be relevant? I skimmed through it but did not find something that addresses anything beyond Lapply functions. The introduction seems to talk about Master/Slave framework but I don't see any concrete functions to implement that. – Miheer Nov 16 '17 at 00:27

1 Answers1

2

Miheer,

Here is a proposed solution.

Pre-amble:

The core problem here (as I understand it) is to unblock the while loop whilst work() is number crunching. Essentially, you want the loop to be non-blocked so long as resources remain to initiate more work() calls and processing. Ok, how? Well, my recommendation is you use the future package.

The example below essentially creates a new process invocation of work() for each call. The invocation, however, will not block the while loop unless all the assigned worker processes are busy. You can see this as each work() invocation has a different process id as shown in the runtime output.

Thus, each work() runs independently, to finish we resolve all the futures and return the finals results.

Results:

  • Sequential runtime: 20.61 sec elapsed
  • Parallel runtime: 8.22 sec elapsed

I hope this points you in the right direction.

Caveat: You have to run through all nodes, but it does improve runtime.

Machine setup:

R version 3.4.1 (2017-06-30)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows >= 8 x64 (build 9200)
[Windows 10, 8 Core Xeon, 64Gb RAM]

Parallel Code Example:

# Check for, and install and load required packages.
requiredPackages <-
  c("tictoc", "listenv", "future")


ipak <- function(pkg) {
  new.pkg <- pkg[!(pkg %in% installed.packages()[, "Package"])]
  if (length(new.pkg))
    install.packages(new.pkg, dependencies = TRUE)
  sapply(pkg, require, character.only = TRUE)
}

ipak(requiredPackages)

work <- function(n) {
  # Do some intensive work (e.g explore a graph starting at n).
  # After this, we don't need to execute work() on nodes in exclude.
  # (e.g exclude could be the nodes explored/reached from n)
  # n is just an example. exclude can be a potentially large set.
  Sys.sleep(2) # sample(.5:5))
  exclude <- n
  return(exclude)
}

plan(multiprocess, workers = 4L)
#plan(sequential)

nodesGraph  <- 1:10
nodesGraph  <- sample(nodesGraph)
nodesCount  <- length(nodesGraph)
resultsList <- listenv()

tic()
while ( nodesCount > 0 ) {
  n <- nodesGraph[[nodesCount]]
  ## This is evaluated in parallel and will only block
  ## if all workers are busy.
  resultsList[[nodesCount]] %<-% {
      list( exclude = work(n), 
            iteration = length(nodesGraph), 
            pid = Sys.getpid())
  }

  nodesGraph <- setdiff(nodesGraph, nodesGraph[[nodesCount]] )
  cat("nodesGraph",nodesGraph,"\n")
  cat("nodesCount",nodesCount,"\n")
  nodesCount = nodesCount - 1
}
toc()

## Resolve all futures (blocks if not already finished)
resultsList <- as.list(resultsList)
str(resultsList)

Parallel Runtime Output:

> source('<hidden>/dev/stackoverflow/47230384/47230384v5.R')
nodesGraph 2 5 8 4 6 10 7 1 9 
nodesCount 10 
nodesGraph 2 5 8 4 6 10 7 1 
nodesCount 9 
nodesGraph 2 5 8 4 6 10 7 
nodesCount 8 
nodesGraph 2 5 8 4 6 10 
nodesCount 7 
nodesGraph 2 5 8 4 6 
nodesCount 6 
nodesGraph 2 5 8 4 
nodesCount 5 
nodesGraph 2 5 8 
nodesCount 4 
nodesGraph 2 5 
nodesCount 3 
nodesGraph 2 
nodesCount 2 
nodesGraph  
nodesCount 1 
8.22 sec elapsed
List of 10
 $ :List of 3
  ..$ exclude  : int 2
  ..$ iteration: int 1
  ..$ pid      : int 10692
 $ :List of 3
  ..$ exclude  : int 5
  ..$ iteration: int 2
  ..$ pid      : int 2032
 $ :List of 3
  ..$ exclude  : int 8
  ..$ iteration: int 3
  ..$ pid      : int 16356
 $ :List of 3
  ..$ exclude  : int 4
  ..$ iteration: int 4
  ..$ pid      : int 7756
 $ :List of 3
  ..$ exclude  : int 6
  ..$ iteration: int 5
  ..$ pid      : int 10692
 $ :List of 3
  ..$ exclude  : int 10
  ..$ iteration: int 6
  ..$ pid      : int 2032
 $ :List of 3
  ..$ exclude  : int 7
  ..$ iteration: int 7
  ..$ pid      : int 16356
 $ :List of 3
  ..$ exclude  : int 1
  ..$ iteration: int 8
  ..$ pid      : int 7756
 $ :List of 3
  ..$ exclude  : int 9
  ..$ iteration: int 9
  ..$ pid      : int 10692
 $ :List of 3
  ..$ exclude  : int 3
  ..$ iteration: int 10
  ..$ pid      : int 2032

Sequential Runtime output

> source('<hidden>/dev/stackoverflow/47230384/47230384v5.R')
nodesGraph 6 2 1 9 4 8 10 7 3 
nodesCount 10 
nodesGraph 6 2 1 9 4 8 10 7 
nodesCount 9 
nodesGraph 6 2 1 9 4 8 10 
nodesCount 8 
nodesGraph 6 2 1 9 4 8 
nodesCount 7 
nodesGraph 6 2 1 9 4 
nodesCount 6 
nodesGraph 6 2 1 9 
nodesCount 5 
nodesGraph 6 2 1 
nodesCount 4 
nodesGraph 6 2 
nodesCount 3 
nodesGraph 6 
nodesCount 2 
nodesGraph  
nodesCount 1 
20.61 sec elapsed
List of 10
 $ :List of 3
  ..$ exclude  : int 6
  ..$ iteration: int 1
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 2
  ..$ iteration: int 2
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 1
  ..$ iteration: int 3
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 9
  ..$ iteration: int 4
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 4
  ..$ iteration: int 5
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 8
  ..$ iteration: int 6
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 10
  ..$ iteration: int 7
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 7
  ..$ iteration: int 8
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 3
  ..$ iteration: int 9
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 5
  ..$ iteration: int 10
  ..$ pid      : int 12484
Technophobe01
  • 8,212
  • 3
  • 32
  • 59
  • Nice. A suggested improvement: now at intermediate points in the loop (say every 5 steps), you decide to wait for all the existing executions to complete, collect their `excludes`, and use that information in the future iterations. This way you are not potentially running through all the nodes. – Miheer Nov 18 '17 at 18:00
  • No worries, happy to help, it is an interesting problem. You'll have to be a little careful not to create dependencies in and outside of the processes if I understand what you propose correctly. – Technophobe01 Nov 18 '17 at 18:10
  • I have modified the `work()` function in my post to fit the more realistic example I had in mind. Can you show how it performs there? I would accept this answer if it works well on that. – Miheer Nov 18 '17 at 18:13
  • Miheer - I am heading out. (Saturday here). My sense is you should play around with the code, it is a complete self-installing example and read the`future` package documentation. You have to be careful not to create temporal dependencies between the data and execution conditions that will cause you to collapse to sequential time as CPak alluded too. – Technophobe01 Nov 18 '17 at 18:18
  • Sure, I will try to work it out. Giving you the bounty. – Miheer Nov 18 '17 at 18:21