1

I have some R code using the foreach and doParallel to parallelize (?) an lapply() call, aka parLapply(). It is 100 tasks and I'm splitting across my laptop's 4 cores. The function automatically partitions the tasks in sets of 25 with 1:25, 26:50, 51:75, 76:100 as the splits across the CPU. (I know because I've saved files using integer index as list to iterate over.) The tasks much simpler for tasks 76:100 and as such they were completed very quickly. Meanwhile, all the other tasks are still queued.

Whenever I run some more involved code like this, I will periodically monitor the progress and check out the task manager. Before, I've noticed where the processes spin up and down in between executions. This time, I saw that there is now one "core" process that is persistently/perpetually/constantly/always(now) inactive or idle, CPU 0% in task manager.

screengrab of task manager

I assume this one was dedicated to the task list that was more quickly completed. There are 3 others that are active as usual processing away.

So my question: is there any way that I can modify the parallel coding so that tasks are more evenly distributed? Or a way to re-incorporate the idle core/process?

Similar to what is discussed in this post.

#not executed so not sure if actually reprex, but the concept is there
#computer is still running my other code :)
library(foreach)
library(doParallel)

cl <- makeCluster(mc <- getOption("cl.cores", parallel::detectCores()))

clusterExport(cl=cl, varlist=c("reprex"))
clusterEvalQ(cl, {library(dplyr)})

registerDoParallel(cl)

reprex2 <- parLapply(cl, 
                     1:100, 
                     function(x){
                       if(x<=75){
                         Sys.sleep(10)
                       }else{
                         print("Side question: any way for this to be seen? Or maybe some kind of progress bar baked into the code? I've taken to saving along the way to monitor.")
                       }
                     })

stopCluster(cl)

A lo-fi but maybe tedious solution I thought of would be to intuit or benchmark each "type" of task then organize/partition somewhat manually. But, I'm more looking for something on the code side for a less occupied / fatigued processor to deal with. :)

Bonus bonus bonus question(s) that are likely googleable: is each execution in parallel done with a clean environment? application-wise, should I front load memory intensive object when each core is established? or is it better to load a subset each time? Why do lists of lists take up more memory than a dataframe with the same content?

daileyco
  • 743
  • 5
  • 13
  • 1
    Parallel progress bar options https://stackoverflow.com/questions/58473626/r-doparallel-progress-bar-to-monitor-finished-jobs – M.Viking Oct 11 '22 at 03:18
  • Maybe the `chunk.size=` argument of `parLapply()` can help more evenly distribute – M.Viking Oct 11 '22 at 03:22
  • 1
    There is a load-balanced `parLapplyLB` function that should do dynamic load balancing over the nodes (i.e. your cores). Maybe this will solve the problem for you without needing additional coding from your side. – Julian_Hn Oct 11 '22 at 05:21
  • @Julian_Hn I saw that! Thanks! Add an answer and I'll mark solved by you. But, my code transitioned to foreach() instead of parLapply(); do you know of anything for that function? – daileyco Nov 10 '22 at 19:48

1 Answers1

1

For load balanced parallel lapply a LB version of parLapply calles parLapplyLB() exists in parallel. This won't preschedule, but assign a new job to each node whenever it becomes free. This will work better for heterogenous tasks, but comes with additional overhead and might thus slow down homogenous tasks.

For foreach / %dopar% we can explicitly turn off prescheduling by passing options.multicore / options.snow in the foreach call. However it looks like prescheduling is disabled here by default, and has to be turned on explicitly if desired.

The following script demonstrates this:

library(parallel)
library(doParallel)
library(foreach)
library(ggplot2)

cl <- makeCluster(2) # Create Cluster
clusterApply(cl, seq_along(cl), function(i) workerID <<- i) # Export global Worker ID for later use
registerDoParallel(cl)  # register cluster for foreach use

#num_reps <- rep(c(10,2),4)
num_reps <- rep(c(10,2),each=4)

# The functions are just dummy functions that wait for varying amount of times, simulating heterogenous tasks
times_no_opts <- foreach(i=num_reps) %dopar%
  sapply(seq_len(i), function(x) {
    Sys.sleep(0.1)
    c(id=workerID,time=format(Sys.time(),"%H:%M:%OS2"))
  })


times_exp_preschedule <- foreach(i=num_reps,
                               .options.multicore=list(preschedule=TRUE),
                               .options.snow=list(preschedule=TRUE)) %dopar%
  sapply(seq_len(i), function(x) {
    Sys.sleep(0.1)
    c(id=workerID,time=format(Sys.time(),"%H:%M:%OS2"))
  })

times_exp_no_preschedule <- foreach(i=num_reps,
                                .options.multicore=list(preschedule=FALSE),
                                .options.snow=list(preschedule=TRUE)) %dopar%
  sapply(seq_len(i), function(x) {
    Sys.sleep(0.1)
    c(id=workerID,time=format(Sys.time(),"%H:%M:%OS2"))
  })

times_no_opts <- data.frame(index=as.character(rep(1:length(num_reps),
                                            sapply(times_no_opts,ncol))),
                                  clusterID = unlist(sapply(times_no_opts,`[`,1,)),
                                  time=as.numeric(stringr::str_extract(unlist(sapply(times_no_opts,`[`,2,)),
                                                                       "[0-9\\.]+$")))




times_exp_no_preschedule <- data.frame(index=as.character(rep(1:length(num_reps),
                                                         sapply(times_exp_no_preschedule,ncol))),
                                  clusterID = unlist(sapply(times_exp_no_preschedule,`[`,1,)),
                                  time=as.numeric(stringr::str_extract(unlist(sapply(times_exp_no_preschedule,`[`,2,)),
                                                                       "[0-9\\.]+$")))

times_exp_preschedule <- data.frame(index=as.character(rep(1:length(num_reps),
                                                              sapply(times_exp_preschedule,ncol))),
                                       clusterID = unlist(sapply(times_exp_preschedule,`[`,1,)),
                                       time=as.numeric(stringr::str_extract(unlist(sapply(times_exp_preschedule,`[`,2,)),
                                                                            "[0-9\\.]+$")))



ggplot(data=times_no_opts,
       aes(x=time,fill=clusterID,group=clusterID)) + 
  geom_histogram(binwidth=0.1,colour="black") +
  labs(title="No Opts")

ggplot(data=times_exp_no_preschedule,
       aes(x=time,fill=clusterID,group=clusterID)) + 
  geom_histogram(binwidth=0.1,colour="black") +
  labs(title="Explicit No Preschedule")

ggplot(data=times_exp_preschedule,
       aes(x=time,fill=clusterID,group=clusterID)) + 
  geom_histogram(binwidth=0.1,colour="black") + 
  labs(title="Explicit Preschedule")

enter image description here

enter image description here

enter image description here

Julian_Hn
  • 2,086
  • 1
  • 8
  • 18