1

I'm running the foreach package to try to parallelize my windows function (this was the only version of parallelizing I could follow easily). I basically need to call a function for g=1, then g=2, etc., and wanted to do this faster.

  • My function works perfectly fine with a regular for loop or with %do% instead of %dopar%
  • I believe I am passing all of the packages I am using and hopefully the correct variables/objects
  • but I have very little understanding of parallelizing & nodes, and the errors don't give me enough to troubleshoot on
  • I only included my main function, not all of the other functions it calls, but can provide
  • would appreciate any help on this issue, parallelizing in Windows, and what kinds of things I have to keep in mind to make sure my %do% code works across the nodes from %dopar%

Thank you very much for your help!!

My code:

    #agonize parallel
#main function
par_agonize <- function(datfile, num_groups, regen_pref_matrices = FALSE, graph_groups = num_groups) {
  if (regen_pref_matrices) mm <- gen_pref_matrices(datfile)
  out <- list()
  tic.clearlog()
  improve <- tibble(groups=numeric(), agony=numeric(), abs_dec=numeric(), percent_dec=numeric(), total_dec=numeric(), tot_per_dec=numeric())

  foreach(g = 1:num_groups, .packages = loaded.package.names, .export = c(loaded.functions, loaded.objects), .verbose = TRUE) %dopar% { #key line where I use dopar/foreach

    tic()

    out[[g]] <- find_groups(mm, g) #this is the critical line, the improve and tic/toc log are just accessories

    toc(log = TRUE, quiet = FALSE) #calculates time
    log.lst <- tic.log(format = FALSE)

    if (g == 1) { #this calculates summary statistics, not important
      improve <- add_row(improve, groups = g, agony = out[[g]]$ag, abs_dec = 0, percent_dec = 0, total_dec = 0, tot_per_dec=0)
    } 
    else {
      improve <- add_row(improve, groups = g, agony = out[[g]]$ag, abs_dec = out[[g]]$ag - out[[g-1]]$ag, 
                         percent_dec = (out[[g]]$ag - out[[g-1]]$ag)/(out[[g-1]]$ag), total_dec = out[[g]]$ag - out[[1]]$ag,
                         tot_per_dec = (out[[g]]$ag - out[[1]]$ag)/(out[[1]]$ag))
    }
  }
  #just saves output to my list
  out[["summary_stats"]] <- improve
  out[["timings"]] <- tibble(num_groups = 1:g, run_time = unlist(lapply(log.lst, function(x) x$toc - x$tic))) %>% 
    add_row("num_groups" = "Total", "run_time" = sum(out[["timings"]]$run_time[1:g]))
  out[["agony_graph"]] <- graph_agony(out, graph_groups)

  social_rank <<- out
  return(social_rank$agony_graph)
}

#test code
registerDoParallel(cores = detectCores() - 1)
loaded.package.names <- c(sessionInfo()$basePkgs, names(sessionInfo()$otherPkgs))
loaded.package.names #works
loaded.functions <- c("assign_groups", "find_agony", "find_groups", "generate_hierarchy", "gen_pref_matrices", "graph_agony", "init")
loaded.objects <- c("mm") #I can regenerate mm within my code... or use the mm that's already there, so I figured I would export it him
system.time(par_agonize("./data/hof17.csv", 2, regen=F)) #this is the MAIN line that runs my function
stopCluster(cl) #not clear if needed

My current error is:

automatically exporting the following variables from the local 
environment:
  improve, out 
explicitly exporting variables(s): assign_groups, find_agony, find_groups, 
generate_hierarchy, gen_pref_matrices, graph_agony, init, mm
numValues: 2, numResults: 0, stopped: TRUE
got results for task 1
numValues: 2, numResults: 1, stopped: TRUE
returning status FALSE
got results for task 2
accumulate got an error result
numValues: 2, numResults: 2, stopped: TRUE
calling combine function
evaluating call object to combine results:
    fun(accum, result.1)
returning status TRUE
Show Traceback

Rerun with Debug
Error in { : task 2 failed - "replacement has length zero"
D Schiff
  • 91
  • 9
  • 2
    When asking for help, you should include a simple [reproducible example](https://stackoverflow.com/questions/5963269/how-to-make-a-great-r-reproducible-example) with sample input and desired output that can be used to test and verify possible solutions. Try removing any code not directly related to the error. Is there a special reason you are using `<<-` in `social_rank <<- out`? That seems suspicious. – MrFlick Jul 11 '18 at 15:23
  • A fully reproducible example will be hard in code because there is a compiled C++ .exe and a data file needed for input. But I'm happy to share the full .R, .exe, and .csv file if that would be helpful? What do you think is the best approach? As for <<- , that's my amateur status. I am trying to save the output to a list in the global environment I guess. I suppose I can always just return the list as it is and stored as an object when I called the main function? return(socialrank)? Then call socialrank <- par_agonize("./data/hof17.csv", 2, regen=F) – D Schiff Jul 11 '18 at 15:53
  • Preferably the reproducable example should also be minimal,c.f. [mcve]. – Ralf Stubner Jul 11 '18 at 18:41

2 Answers2

1

Within the foreach loop you are writing to objects defined outside of the loop: out and improve. While this is normal for for loops, even the simplest example from the vignette uses a different syntax:

> x <- foreach(i=1:3) %do% sqrt(i)
> x

In effect one returns something from the loop body. All the results are then collected into a list. This is necessary since for parallel processing different R processes with different memory are used. Effectively foreach is more like lapply than for.

As a first step I suggest you remove improve, assign the result of foreach to out and return the result of your main function call. If that works, then one can find ways to include improve. You could for example create use

list(out = out, improve = improve)

as last statement. This way each foreach worker will return a list with both out and improve in it.

However, keep in mind that parallel processing is no silver bullet. You always introduce some communication overhead, which might negate any gains you get from the parallel execution. This is difficult to asses without reproducible code (and data).

Ralf Stubner
  • 26,263
  • 3
  • 40
  • 75
  • Assigning the foreach to out and then returning only that result worked - thanks! It showed that the items were processed and then merged. A few more novice questions: 1) First, can I calculate and store several objects within that foreach and then do return(out) at the end? I can't figure out how the return syntax works to allow me to return multiple objects. Basically, I would like to add back in the timings and improve/summary stats code. Should I do it within the foreach or how would you approach it? – D Schiff Jul 17 '18 at 15:19
  • Second, unfortunately, it's not clear that the parallel is running faster. The code takes from 30 seconds to say 5 minutes to run, so I expect to see improvement. It should do all the calculations with group = 1, then group = 2, etc., with more groups taking more time. So I thought parallelizing should just take as long as the longest group (say if I'm doing max 5 groups), not 1 + 2 + 3 + 4 + 5. But not clear this is happening. – D Schiff Jul 17 '18 at 15:19
0

I added the timing functions tic/toc to secondary functions, and took them out of foreach. I took Ralf's advice about just returning the primary output from foreach. Then, within the broader function foreach is nested in, I manipulated improve/summary stats & timings, then outputed all of them as a list.

Works great on Windows! Not sure if it's faster than sequential yet though.

par_agonize2 <- function(datfile, num_groups, regen_pref_matrices = FALSE, graph_groups = num_groups) {
  if (regen_pref_matrices) mm <- gen_pref_matrices(datfile)

  out <- foreach(g = 1:num_groups, .packages = loaded.package.names, .export = c(loaded.functions, loaded.objects), .verbose = TRUE) %dopar% {
    x <- find_groups3(mm, g) #this is the critical line, the improve and tic/toc log are just accessories
    }

  improve <- tibble(groups=numeric(), agony=numeric(), abs_dec=numeric(), percent_dec=numeric(), total_dec=numeric(), tot_per_dec=numeric())
  run_time <- vector()

  for (g in 1:num_groups) {
  if (g == 1) {improve <- add_row(improve, groups = 1, agony = out[[1]]$ag, abs_dec = 0, percent_dec = 0, total_dec = 0, tot_per_dec=0)}
    else {improve <- add_row(improve, groups = g, agony = out[[g]]$ag, abs_dec = out[[g]]$ag - out[[g-1]]$ag,
                       percent_dec = (out[[g]]$ag - out[[g-1]]$ag)/(out[[g-1]]$ag), total_dec = out[[g]]$ag - out[[1]]$ag,
                       tot_per_dec = (out[[g]]$ag - out[[1]]$ag)/(out[[1]]$ag))
    }
    run_time[g] = unlist(out[[g]]$run_time)
  }

   timings <- tibble(num_groups = 1:g, run_time = unlist(run_time)) %>%    
     add_row("num_groups" = "Total", "run_time" = sum(unlist(run_time)))

  list(groupings=out, summary_stats=improve, agony_graph=graph_agony(out, graph_groups), timings=timings)
}
D Schiff
  • 91
  • 9