3

I'm learning parallel processing as a way to handle some huge datasets.

I have some variables predefined as below:

CV <- function(mean, sd) {(sd / mean) * 100} 
distThreshold <- 5 # Distance threshold 
CVThreshold <- 20 # CV threshold 

LocalCV <- list()
Num.CV <- list()

Then load parallel library, allocate the base variable and library to the clusters:

library(parallel)
clust_cores <- makeCluster(detectCores(logical = T) ) 
clusterExport(clust_cores, c("i","YieldData2rd","CV", "distThreshold", "CVThreshold"))
clusterEvalQ(clust_cores, library(sp))

Then pass the cluster argument clust_cores to parSapply:

for (i in seq(YieldData2rd)) {
  LocalCV[[i]] = parSapply(clust_cores, X = 1:length(YieldData2rd[[i]]), 
                   FUN = function(pt) {
                     d = spDistsN1(YieldData2rd[[i]], YieldData2rd[[i]][pt,])
                     ret = CV(mean = mean(YieldData2rd[[i]][d < distThreshold, ]$yield), 
                              sd = sd(YieldData2rd[[i]][d < distThreshold, ]$yield))
                     return(ret)
                   }) # calculate CV in the local neighbour 
}

stopCluster(clust_cores) 

Then I get Error in checkForRemoteErrors(val) : 6 nodes produced errors; first error: subscript out of bounds in addition to warning messages: 1: closing unused connection (<-localhost:11688).

Please let me know how to resolve this issue.

For reproducible example, I create a large list object, which runs fine in the original for loop without the parallel processing components.

library('rgdal')

Yield1 <- data.frame(yield=rnorm(460, mean = 10), x1=rnorm(460, mean = 1843235), x2=rnorm(460,mean = 5802532))
Yield2 <- data.frame(yield=rnorm(408, mean = 10), x1=rnorm(408, mean = 1843235), x2=rnorm(408, mean = 5802532))
Yield3 <- data.frame(yield=rnorm(369, mean = 10), x1=rnorm(369, mean = 1843235), x2=rnorm(369, mean = 5802532))

coordinates(Yield1) <- c('x1', 'x2')
coordinates(Yield2) <- c('x1', 'x2')
coordinates(Yield3) <- c('x1', 'x2')

YieldData2rd <- list(Yield1, Yield2, Yield3)
M八七
  • 233
  • 4
  • 11
  • Can you please provide a sample of `YieldData2rd`? without it the code doesn't run. In addition, you export `i` to the cluster before defining it. – Omry Atia Sep 20 '18 at 13:23
  • Please find my edited question. A small sample dataset provided as reproducible example, which runs fine in the original `for` loop. I export `i` to the cluster because otherwise it shows me unable to find object `i`. – M八七 Sep 21 '18 at 07:21
  • when I run your code as in this revised version above, I get a different error: Error in checkForRemoteErrors(val) : 4 nodes produced errors; first error: object 'i' not found. The reason is that for indexed data, parApply is not suitable: check out the function `foreach`, which is the parallel version of a for loop. Hope this helps – Omry Atia Sep 21 '18 at 07:24
  • @OmryAtia Thanks, I tried rewriting the code using the `foreach` package, and post an answer below, which seems to work fine on my side. Just one question, how do I know it's actually faster than the `for` loop? – M八七 Sep 21 '18 at 10:32

1 Answers1

0

Thanks for @Omry Atia 's comment, I started to look into the foreach package and had my first try.

library(foreach)
library(doParallel)

#setup parallel backend to use many processors
cores=detectCores()
clust_cores <- makeCluster(cores[1]-1) #not to overload your computer
registerDoParallel(clust_cores)

LocalCV = foreach(i = seq(YieldData2rd), .combine=list, .multicombine=TRUE) %dopar% {
                       LocalCV[[i]] = sapply(X = 1:length(YieldData2rd[[i]]), 
                                            FUN = function(pt) {
                                                  d = spDistsN1(YieldData2rd[[i]], YieldData2rd[[i]][pt,])
                                                ret = CV(mean = mean(YieldData2rd[[i]][d < distThreshold, ]$yield), 
                                                 sd = sd(YieldData2rd[[i]][d < distThreshold, ]$yield))
                                                 return(ret)
                                                 }) # calculate CV in the local neighbour 
                       }

stopCluster(clust_cores)

It would print out the whole thing without putting a LocalCV to the front of foreach.

It will try the new codes on some huge datasets and see how fast it can get.

Reference: run a for loop in parallel in R

M八七
  • 233
  • 4
  • 11