2

I have some R code that puts together demographic data from the Census for all of states in the US into a list object. The block-level code can take a week to run as a sequential loop since there are ~11M blocks, so I am trying to parallelize the loop over states to make it faster. I have accomplished this goal with this:

states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
           "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
           "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
           "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
           "VT","VA","WA","WV","WI","WY","DC","PR")
library(future.apply)
plan(multiprocess)
ptm <- proc.time()
CensusObj_block_age_sex = list()

CensusObj_block_age_sex[states] <- future_lapply(states, function(s){
  county <- census_geo_api(key = "XXX", state = s, geo = "county", age = TRUE, sex = TRUE)
  tract  <- census_geo_api(key = "XXX", state = s, geo = "tract",  age = TRUE, sex = TRUE)
  block  <- census_geo_api(key = "XXX", state = s, geo = "block",  age = TRUE, sex = TRUE)
  censusObj[[s]] <- list(state = s, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
}
)

However, I need to make it more robust. Sometimes there are problem with the Census API, so I would like the CensusObj to be updated at each state iteration so that I don't loose my completed data if something wrong. That way I can restart the loop over the remaining state if something does goes wrong (like if I spell "WY" as "WU")

Would it be possible to accomplish this somehow? I am open to other methods of parallelization.


The code above runs, but it seems to run into memory issues:

Error: Failed to retrieve the value of MultisessionFuture (future_lapply-3) from cluster RichSOCKnode #3 (PID 80363 on localhost ‘localhost’). The reason reported was ‘vector memory exhausted (limit reached?)’. Post-mortem diagnostic: A process with this PID exists, which suggests that the localhost worker is still alive.

I have R_MAX_VSIZE = 8Gb in my .Renviron, but I am not sure how that would get divided between the 8 cores on my machine. This all suggests that I need to store the results of each iteration rather than try to keep it all in memory, and then append the objects together at the end.

dimitriy
  • 9,077
  • 2
  • 25
  • 50
  • I'm not sure but maybe you can call save() in each iteration after CensusObj is updated. Then the latest version will be stored in the .RData object used in save(). – Akshit Aug 17 '20 at 03:26
  • 1
    I would use `saveRDS()` to save each result individually somewhere. You can check that it already exists if you have to restart the loop. And combine the results later. – F. Privé Aug 17 '20 at 06:13
  • @Akshit In my understanding, the census object does not get updated until the very end of the loop, even if it parallelized. – dimitriy Aug 19 '20 at 05:34
  • @F.Privé I tried your suggestion, but it seems to overwrite the results with the current state. – dimitriy Aug 19 '20 at 05:35
  • @F.Privé is correct - saving/caching intermediate results to one file per "iteration" (per `s`) is the way to go. – HenrikB Aug 19 '20 at 06:44
  • Have you checked the average object size for each of these? If you load Montana and Texas you can probably get an idea of how much data you are actually downloading per state. It sounds to me like you are trying to download 50+ GB of data but I'm not sure exactly what this dataset is. – Adam Sampson Aug 21 '20 at 14:47
  • Also remember that if this data comes in compressed it takes extra memory to decompress the data. – Adam Sampson Aug 21 '20 at 14:48

3 Answers3

1

You could use a tryCatch inside future_lapply to try to relaunch the calculation in case of API error, for a maximum of maxtrials.
In the resulting list, you get for each calculation the number of trials and the final status, OK or Error:

    states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
                "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
                "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
                "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
                "VT","VA","WA","WV","WI","WY","DC","PR")
    library(future.apply)
    #> Le chargement a nécessité le package : future
    plan(multiprocess)
    ptm <- proc.time()

    maxtrials <- 3

    census_geo_api <-
      function(key = "XXX",
               state = s,
               geo = "county",
               age = TRUE,
               sex = TRUE) {
        paste(state,'-', geo)
      }


    CensusObj_block_age_sex <- future_lapply(states, function(s) {
      ntrials <- 1
      while (ntrials <= maxtrials) {
        hasError <- tryCatch({
          #simulate random error
          if (runif(1)>0.3) {error("API failed")}
          county <- census_geo_api(key = "XXX", state = s, geo = "county", age = TRUE, sex = TRUE)
          tract  <- census_geo_api(key = "XXX", state = s, geo = "tract",  age = TRUE, sex = TRUE)
          block  <- census_geo_api(key = "XXX", state = s, geo = "block",  age = TRUE, sex = TRUE)
        },
        error = function(e)
          e)

        if (inherits(hasError, "error")) {
          ntrials <- ntrials + 1
        } else { break}
          
      }
      
      if (ntrials > maxtrials) {
        res <- list(state = s, status = 'Error', ntrials = ntrials-1, age = NA, sex = NA, block = NA, tract = NA, county = NA)
      } else  {
        res <- list(state = s, status = 'OK'    , ntrials = ntrials, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
      }
      res
    }
    )

    CensusObj_block_age_sex[[1]]
    #> $state
    #> [1] "AL"
    #> 
    #> $status
    #> [1] "OK"
    #> 
    #> $ntrials
    #> [1] 3
    #> 
    #> $age
    #> [1] TRUE
    #> 
    #> $sex
    #> [1] TRUE
    #> 
    #> $block
    #> [1] "AL - block"
    #> 
    #> $tract
    #> [1] "AL - tract"
    #> 
    #> $county
    #> [1] "AL - county"

<sup>Created on 2020-08-19 by the [reprex package](https://reprex.tidyverse.org) (v0.3.0)</sup>
Waldi
  • 39,242
  • 6
  • 30
  • 78
1

Here is a solution that uses doParallel (with the options for UNIX systems, but you can also use it on Windows, see here) and foreach that stores the results for every state separately and afterwards reads in the single files and combines them to a list.

library(doParallel)
library(foreach)

path_results <- "my_path"
ncpus = 8L
registerDoParallel(cores = ncpus)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
            "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
            "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
            "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
            "VT","VA","WA","WV","WI","WY","DC","PR")
results <- foreach(state = states) %dopar% {
                     county <- census_geo_api(key = "XXX", state = state, geo = "county", age = TRUE, sex = TRUE)
                     tract  <- census_geo_api(key = "XXX", state = state, geo = "tract",  age = TRUE, sex = TRUE)
                     block  <- census_geo_api(key = "XXX", state = state, geo = "block",  age = TRUE, sex = TRUE)
                     results <- list(state = state, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
                     
                     # store the results as rds
                     saveRDS(results,
                             file = paste0(path_results, "/", state, ".Rds"))
                     
                     # remove the results
                     rm(county)
                     rm(tract)
                     rm(block)
                     rm(results)
                     gc()
                     
                     # just return a string
                     paste0("done with ", state)
}

library(purrr)
# combine the results to a list
result_files <- list.files(path = path_results)
CensusObj_block_age_sex <- set_names(result_files, states) %>% 
  map(~ readRDS(file = paste0(path_results, "/", .x)))
starja
  • 9,887
  • 1
  • 13
  • 28
  • This is great. The only thing I had to tweak was to get this to work was `foreach(i = states)` since the census API need two character version. – dimitriy Aug 23 '20 at 02:23
  • Thanks for pointing this out; I forgot to use `states` in the loop; I've edited it so that it directly works now – starja Aug 23 '20 at 06:34
  • I am having some trouble with the combining step at the very end. The error says `Error: vector memory exhausted (limit reached?)`, but I have `R_MAX_VSIZE = 14Gb` in my .Renviron file and I have 16G of memory on my machine (and I am doing nothing else). There are 52 Rds files that are 654M according to `du -sh`. TX and CA are both less than 70M. I am not sure why I am having this problem. Is there any way around that? – dimitriy Aug 29 '20 at 22:06
  • So all `Rds` files together are 654MB? I'm not sure why R behaves so strangely. Are you by any change using MacOS? There seems to be this problem: https://stackoverflow.com/questions/51295402/r-on-macos-error-vector-memory-exhausted-limit-reached – starja Aug 30 '20 at 07:28
  • That's why this is so strange. I am using Rstudio on MacOS. I can watch the memory used by RStudio go up to 14GB as the files are added and then I get that error. Would I need to turn off the parallel processing before I merge? I wonder if R is trying to load the same data into each of the 8 cores that I was using before to download the data and that's how I run out of memory. – dimitriy Aug 30 '20 at 07:41
  • I'd assume that the merging is not done in parallel because it wasn't specified in the code, but just to be sure try the merging after closing/reopening RStudio. If this doesn't help, try the solution from the link and increase `R_MAX_VSIZE` from outside RStudio – starja Aug 30 '20 at 07:49
  • I was always using the touch method, but once I cranked `R_MAX_SIZE` to 200GB, it worked. The final Rds was 631M, but it rsession used 95GB of memory to do the merge. ! I have 16GB of memory on my machine, so this all seems like witchcraft to me. In any case, thanks again for your help! – dimitriy Aug 30 '20 at 08:30
0

One possible solution that I have is to log the value of CensusObj to a text file i.e print the CensusObj in each iteration. The doSNOW package can be used for logging for example

library(doSNOW)
cl <- makeCluster(1, outfile="abc.out")
registerDoSNOW(cl)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
        "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
        "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
        "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
        "VT","VA","WA","WV","WI","WY","DC","PR")
foreach(i=1:length(states), .combine=rbind, .inorder = TRUE) %dopar% {
    county <- "A"
    tract  <- "B"
    block  <- "C"
    censusObj <- data.frame(state = states[i], age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
    # edit: print json objects to easily extract from the file
    cat(sprintf("%s\n",rjson::toJSON(censusObj)))
}
stopCluster(cl)

This would log the value of censusObj in abc.out and also logs the error if program crashes but you will get the latest value of censusObj logged in abc.out.

Here is the output of the last iteration from the log file:

Type: EXEC {"state":"PR","age":true,"sex":true,"block":"C","tract":"B","county":"A"} Type: DONE

Type:EXEC means that the iteration has started and Type:DONE means execution is completed. The result of cat will be present between these two statements of each iteration. Now, the value of CensusObj can be extracted from the log file as shown below:

Lines = readLines("abc.out")
results = list()
for(i in Lines){
    # skip processing logs created by doSNOW
    if(!startsWith(i, "starting") && !startsWith(i, "Type:")){
        results = rlist::list.append(results, jsonlite::fromJSON(i))      
    }
}

results will contain the elements all the values printed in abc.out.

> head(results, 1)
[[1]]
[[1]]$state
[1] "AL"

[[1]]$age
[1] TRUE

[[1]]$sex
[1] TRUE

[[1]]$block
[1] "C"

[[1]]$tract
[1] "B"

[[1]]$county
[1] "A"

It is not a very clean solution but works.

Akshit
  • 424
  • 4
  • 15
  • Could you give a pointer on the best way to extract from the log file and put it all back together at the end? – dimitriy Aug 20 '20 at 17:23
  • @DimitriyV.Masterov I have updated my answer demonstrating how you could get the values in R. Hope this helps you! – Akshit Aug 20 '20 at 19:07