Miheer,
Here is a proposed solution.
Pre-amble:
The core problem here (as I understand it) is to unblock the while loop whilst work()
is number crunching. Essentially, you want the loop to be non-blocked so long as resources remain to initiate more work()
calls and processing. Ok, how? Well, my recommendation is you use the future package.
The example below essentially creates a new process invocation of work()
for each call. The invocation, however, will not block the while loop unless all the assigned worker processes are busy. You can see this as each work()
invocation has a different process id as shown in the runtime output.
Thus, each work()
runs independently, to finish we resolve all the futures and return the finals results.
Results:
- Sequential runtime: 20.61 sec elapsed
- Parallel runtime: 8.22 sec elapsed
I hope this points you in the right direction.
Caveat: You have to run through all nodes, but it does improve runtime.
Machine setup:
R version 3.4.1 (2017-06-30)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows >= 8 x64 (build 9200)
[Windows 10, 8 Core Xeon, 64Gb RAM]
Parallel Code Example:
# Check for, and install and load required packages.
requiredPackages <-
c("tictoc", "listenv", "future")
ipak <- function(pkg) {
new.pkg <- pkg[!(pkg %in% installed.packages()[, "Package"])]
if (length(new.pkg))
install.packages(new.pkg, dependencies = TRUE)
sapply(pkg, require, character.only = TRUE)
}
ipak(requiredPackages)
work <- function(n) {
# Do some intensive work (e.g explore a graph starting at n).
# After this, we don't need to execute work() on nodes in exclude.
# (e.g exclude could be the nodes explored/reached from n)
# n is just an example. exclude can be a potentially large set.
Sys.sleep(2) # sample(.5:5))
exclude <- n
return(exclude)
}
plan(multiprocess, workers = 4L)
#plan(sequential)
nodesGraph <- 1:10
nodesGraph <- sample(nodesGraph)
nodesCount <- length(nodesGraph)
resultsList <- listenv()
tic()
while ( nodesCount > 0 ) {
n <- nodesGraph[[nodesCount]]
## This is evaluated in parallel and will only block
## if all workers are busy.
resultsList[[nodesCount]] %<-% {
list( exclude = work(n),
iteration = length(nodesGraph),
pid = Sys.getpid())
}
nodesGraph <- setdiff(nodesGraph, nodesGraph[[nodesCount]] )
cat("nodesGraph",nodesGraph,"\n")
cat("nodesCount",nodesCount,"\n")
nodesCount = nodesCount - 1
}
toc()
## Resolve all futures (blocks if not already finished)
resultsList <- as.list(resultsList)
str(resultsList)
Parallel Runtime Output:
> source('<hidden>/dev/stackoverflow/47230384/47230384v5.R')
nodesGraph 2 5 8 4 6 10 7 1 9
nodesCount 10
nodesGraph 2 5 8 4 6 10 7 1
nodesCount 9
nodesGraph 2 5 8 4 6 10 7
nodesCount 8
nodesGraph 2 5 8 4 6 10
nodesCount 7
nodesGraph 2 5 8 4 6
nodesCount 6
nodesGraph 2 5 8 4
nodesCount 5
nodesGraph 2 5 8
nodesCount 4
nodesGraph 2 5
nodesCount 3
nodesGraph 2
nodesCount 2
nodesGraph
nodesCount 1
8.22 sec elapsed
List of 10
$ :List of 3
..$ exclude : int 2
..$ iteration: int 1
..$ pid : int 10692
$ :List of 3
..$ exclude : int 5
..$ iteration: int 2
..$ pid : int 2032
$ :List of 3
..$ exclude : int 8
..$ iteration: int 3
..$ pid : int 16356
$ :List of 3
..$ exclude : int 4
..$ iteration: int 4
..$ pid : int 7756
$ :List of 3
..$ exclude : int 6
..$ iteration: int 5
..$ pid : int 10692
$ :List of 3
..$ exclude : int 10
..$ iteration: int 6
..$ pid : int 2032
$ :List of 3
..$ exclude : int 7
..$ iteration: int 7
..$ pid : int 16356
$ :List of 3
..$ exclude : int 1
..$ iteration: int 8
..$ pid : int 7756
$ :List of 3
..$ exclude : int 9
..$ iteration: int 9
..$ pid : int 10692
$ :List of 3
..$ exclude : int 3
..$ iteration: int 10
..$ pid : int 2032
Sequential Runtime output
> source('<hidden>/dev/stackoverflow/47230384/47230384v5.R')
nodesGraph 6 2 1 9 4 8 10 7 3
nodesCount 10
nodesGraph 6 2 1 9 4 8 10 7
nodesCount 9
nodesGraph 6 2 1 9 4 8 10
nodesCount 8
nodesGraph 6 2 1 9 4 8
nodesCount 7
nodesGraph 6 2 1 9 4
nodesCount 6
nodesGraph 6 2 1 9
nodesCount 5
nodesGraph 6 2 1
nodesCount 4
nodesGraph 6 2
nodesCount 3
nodesGraph 6
nodesCount 2
nodesGraph
nodesCount 1
20.61 sec elapsed
List of 10
$ :List of 3
..$ exclude : int 6
..$ iteration: int 1
..$ pid : int 12484
$ :List of 3
..$ exclude : int 2
..$ iteration: int 2
..$ pid : int 12484
$ :List of 3
..$ exclude : int 1
..$ iteration: int 3
..$ pid : int 12484
$ :List of 3
..$ exclude : int 9
..$ iteration: int 4
..$ pid : int 12484
$ :List of 3
..$ exclude : int 4
..$ iteration: int 5
..$ pid : int 12484
$ :List of 3
..$ exclude : int 8
..$ iteration: int 6
..$ pid : int 12484
$ :List of 3
..$ exclude : int 10
..$ iteration: int 7
..$ pid : int 12484
$ :List of 3
..$ exclude : int 7
..$ iteration: int 8
..$ pid : int 12484
$ :List of 3
..$ exclude : int 3
..$ iteration: int 9
..$ pid : int 12484
$ :List of 3
..$ exclude : int 5
..$ iteration: int 10
..$ pid : int 12484