0

I am using foreach::foreach() in R to run an analysis in parallel. I am using a computing cluster with 1 node, 500Gb of RAM and 30 cores. I initialize the cluster using:

myCluster <- parallel::makeCluster(28)
doParallel::registerDoParallel(myCluster)

The process runs through completely and takes around 8 hours to complete, however, the foreach loop does not combine the results and returns a null object (lcp_network). The loop code looks like this (not a reprex):

lcp_network <- foreach::foreach(i = 1:nrow(comps), .errorhandling = "remove", .combine = "rbind", .packages = c("sf", "terra","leastcostpath","dplyr")) %dopar% {
  
  lcp <- leastcostpath::create_lcp(cost_surface = tr1,
                                   origin = nodes_sp[comps[i,1],, drop = FALSE],
                                   destination = nodes_sp[comps[i,2],, drop = FALSE])
  
  lcp$origin_ID <- nodes_sp[comps[i,1],]$layer
  lcp$destination_ID <- nodes_sp[comps[i,2],]$layer

   lcp <- lcp %>%
     st_as_sf() %>%
     mutate(length = st_length(.)) %>%
     st_drop_geometry()

  attributes(lcp$length) <- NULL
  
  return(lcp)
  
}


Notably, this same code runs on a smaller subset of data on my personal computer (8Gb of RAM, 8 cores) and combines, no problem. The error message given when using the .verbose argument is:

numValues: 43, numResults: 0, stopped: TRUE
got results for task 1
accumulate got an error result
numValues: 43, numResults: 1, stopped: TRUE
returning status FALSE
got results for task 2
...
returning status FALSE
got results for task 43
numValues: 43, numResults: 43, stopped: TRUE
not calling combine function due to errors
returning status TRUE

Any advice is helpful. I have tried adding gc() within the loop, among other attempted fixes.

EDIT: I noticed that the first description in the verbose statement notes:

accumulate got an error result

and at every point thereafter, it notes:

returning status FALSE

EDIT 2: I ran the same code on a different server, using the same parameters (500Gb of RAM, 30 cores). The error code is different now:

numValues: 43, numResults: 0, stopped: TRUE
Error in unserialize(socklist[[n]]) : error reading from connection
Calls: %dopar% ... recvOneData -> recvOneData.SOCKcluster -> unserialize
Execution halted
slurmstepd: error: Detected 8 oom-kill event(s) in StepId=13251537.batch. 
Some of your processes may have been killed by the cgroup out-of-memory handler.
geoscience123
  • 164
  • 1
  • 11
  • I don't think you should be using `return()` because there's not a function scope as far as I can tell and I don't see it used in the samples for the package. Maybe you have some other reference where it's used. But I would try replacing `return(lcp)` with just `lcp`. Normally the last value of an expression block is the value that's "returned". But that woudln't explain why it runs on a smaller subset. It's easier to help you if you include a simple [reproducible example](https://stackoverflow.com/questions/5963269/how-to-make-a-great-r-reproducible-example) with sample input so we can test – MrFlick Aug 17 '23 at 15:13
  • @MrFlick Tried it, no help. return() is used in `create_FETE_lcp()`: https://github.com/josephlewis/leastcostpath/blob/master/R/create_FETE_lcps.R which is what I'm trying to recreate with some differing functionality. – geoscience123 Aug 17 '23 at 16:01
  • @MrFlick I made an edit that provides more description regarding the verbose statements. – geoscience123 Aug 18 '23 at 14:15
  • Maybe `.combine = "rbind"` isn't what you want because you don't seem to be returning data frames or matrices. If you leave that off, you should get a list back. At least that's what the "accumulate got an error result" error makes me think. – MrFlick Aug 18 '23 at 15:56
  • `lcp` is a data frame though. I'm changing it from a `SPATIALLINESDATAFRAME` to `sf` then dropping the geometry so it becomes a data frame. – geoscience123 Aug 18 '23 at 16:02
  • Also, using @HenrikB's solution did not allow the output from each iteration combine, so instead of returning NULL it returned output from one iteration. – geoscience123 Aug 18 '23 at 16:05
  • @MrFlick, there is now a new error after running on a separate node (see EDIT 2). – geoscience123 Aug 18 '23 at 18:53

4 Answers4

2

Author of Futureverse here. Try with:

library(doFuture)
plan(multisession, workers = 28)

and then replace %dopar% with %dofuture% (IMPORTANT).

That will run a similar cluster of workers, but it will give you more informative error messages.

PS. It is quite common to run out of memory when using too many parallel workers on the same machine.

HenrikB
  • 6,132
  • 31
  • 34
  • How do you "unplan"? – geoscience123 Aug 17 '23 at 16:06
  • In this case, `plan(sequential)`. – HenrikB Aug 17 '23 at 16:51
  • For whatever reason, now the object is returned but it is still not being combined. The whole script runs without error as well. Even when I put a `write.csv()` line in the loop to write each result to file, it only writes one result to file. – geoscience123 Aug 17 '23 at 19:04
  • Try to drop `.combine` and `.errorhandling` to see what is actually returned. If it works as you expect, it should return a list of `nrow(comps)` elements. Inspect those. All `.combine = rbind` is does it to call `do.call(rbind, result_list)`. – HenrikB Aug 21 '23 at 10:22
  • Also, add a `str(lcp)` before `return(lcp)`; that will display what `lcp` really is in each iteration. – HenrikB Aug 22 '23 at 07:23
1

Regarding your:

EDIT 2: I ran the same code on a different server, using the same parameters (500Gb of RAM, 30 cores). The error code is different now:

numValues: 43, numResults: 0, stopped: TRUE
Error in unserialize(socklist[[n]]) : error reading from connection
Calls: %dopar% ... recvOneData -> recvOneData.SOCKcluster -> unserialize
Execution halted
slurmstepd: error: Detected 8 oom-kill event(s) in StepId=13251537.batch. 
Some of your processes may have been killed by the cgroup out-of-memory handler.

From the last two lines, we can infer that:

  1. You are running your R scripts via Slurm job scheduler ("job queue"), probablby on a high-performance compute (HPC) cluster.

  2. Your job script was consuming more memory (RAM) than what you requested, i.e. more than the amount that you asked Slurm to give you. It could be that you don't specify it, or you specify a too small amount, so you need to increase the amount of memory you request when you submit the job, e.g. --mem=50G (50 GiB of memory).

  3. When you over-used the memory, your job was automatically terminated because it "misbehaved".

When your job is terminated this way, your main R process and all the parallel R workers are also terminated. The order they are terminated may be random, but it looks like some of the workers terminated first, and when the main R process tried to communicate with them, it failed. Hence the Error in unserialize(socklist[[n]]) : error reading from connection message. After this, the main R process was also terminated (Execution halted). The last thing the operating system and Slurm did was outputting those two last lines to inform you about why it was terminated.

HenrikB
  • 6,132
  • 31
  • 34
  • That all makes sense. So basically I have to figure out why I'm running out of memory. It feels like I shouldn't because the HPC is allocating me 500G of memory. – geoscience123 Aug 22 '23 at 10:36
  • I am awarding this the answer, but I did find a more specific problem. Obviously it's not really possible to find the specific problem without having a reprex of my code, so I think this was the most helpful answer. I will post the full problem below. – geoscience123 Aug 24 '23 at 11:34
0

Since you are running on a single node of a linux cluster, consider using a basic mclapply instead of starting a socket cluster. That will alleviate duplicating the tr1, comps, and nodes_sp objects many times on that node. It also avoids serializing and unserializing the results and lets you use the full 30 cores.

Just put your code into a function, call it with mclapply, and combine the returned list with a do.call().

Here is a sketch from your code:

library(sf); library(terra); library(leastcostpath); library(dplyr)

lcp_fun <- function(i, tr1, comps, nodes_sp) {  
  lcp <- create_lcp(cost_surface = tr1, 
                    origin = nodes_sp[comps[i,1],, drop = FALSE],
                    destination = nodes_sp[comps[i,2],, drop = FALSE])
  
  lcp$origin_ID <- nodes_sp[comps[i,1],]$layer
  lcp$destination_ID <- nodes_sp[comps[i,2],]$layer

   lcp <- lcp %>%
     st_as_sf() %>%
     mutate(length = st_length(.)) %>%
     st_drop_geometry()

  attributes(lcp$length) <- NULL
  
  return(lcp)
}
results_list <- parallel::mclapply(1:nrow(comps), lcp_fun, tr1 = tr1, comps = comps, nodes_sp = nodes_sp, mc.cores = 30)
results <- do.call(rbind, results_list)
0

As @HenrikB points out, the job was terminated due to an out of memory killer, which was randomly terminating nodes. The reason the memory was running out, despite being on a 500 GB computing node, was the following:

This function:

lcp_network <- foreach::foreach(i = 1:nrow(comps), .errorhandling = "remove", .combine = "rbind", .packages = c("sf", "terra","leastcostpath","dplyr")) %dopar% {
  
  lcp <- leastcostpath::create_lcp(cost_surface = tr1,
                                   origin = nodes_sp[comps[i,1],, drop = FALSE],
                                   destination = nodes_sp[comps[i,2],, drop = FALSE])
  
  lcp$origin_ID <- nodes_sp[comps[i,1],]$layer
  lcp$destination_ID <- nodes_sp[comps[i,2],]$layer

   lcp <- lcp %>%
     st_as_sf() %>%
     mutate(length = st_length(.)) %>%
     st_drop_geometry()

  attributes(lcp$length) <- NULL
  
  return(lcp)
  
}

Calculates the least cost path using Dijkstra's algorithm. It uses two nodes, indicated by the origin = and destination = arguments, and calculates the paths over the possible routes of the transition layer tr1. The loop worked by iterating through a larger set of nodes, as to only calculate paths for specific nodes which were within a specified distance. In this way, the nodes are reduced in the loop, however, tr1 remained in every iteration as the total surface, which was a file of 8GB.

This means that Dijkstra's algorithm was calculating possible routes using a surface that was much larger than necessary. Thus, the calculations in parallel rapidly ate up the available memory. The solution I found to work is this:

lcp_network <- foreach::foreach(i = 1:nrow(comps), .errorhandling = "remove",
                                .combine = "rbind",
                                .packages = c("sf", "terra","leastcostpath","gdistance","dplyr","raster","tmaptools")) %dopar% {

   bbdf <- st_bbox(fin_poly[c(comps[i,1],comps[i,2]),]) %>%
     bb_poly(.,projection = st_crs(fin_poly)) %>%
     st_as_sf()
   
   tr1 = terra::crop(x=Rgrid %>%
                       terra::rast(),
                     y = bbdf,
                     mask=T) %>%
     raster::raster() %>%
     transition(.,mean,8)

  lcp <- leastcostpath::create_lcp(cost_surface = tr1,
                                   origin = nodes_sp[comps[i,1],, drop = FALSE],
                                   destination = nodes_sp[comps[i,2],, drop = FALSE]) %>%
    st_as_sf() %>%
    mutate(origin_ID = nodes_sp[comps[i,1],]$layer,
         destination_ID = nodes_sp[comps[i,2],]$layer) %>%
    mutate(length = st_length(.)) %>%
    st_drop_geometry()
  attributes(lcp$length) <- NULL
  
  return(lcp)
}

Using this code, a new cost surface is created using a filtered bounding box from the reduced number of nodes.

geoscience123
  • 164
  • 1
  • 11
  • Your new reduced memory function can also be easily plugged into the `mclapply()` version I suggested above. Should be much faster. I am curious what your actual comparison would be. – George Ostrouchov Aug 24 '23 at 19:21