4

I use foreach + doParallel to apply a function to each row of a matrix multithreadedly in R. When the matrix has many rows, foreach takes a long time before and after multithreadedly going over the iterations.

For example, if I run:

library(foreach)
library(doParallel)

doWork <- function(data) {

  # setup parallel backend to use many processors
  cores=detectCores()
  number_of_cores_to_use = cores[1]-1 # not to overload the computer
  cat(paste('number_of_cores_to_use:',number_of_cores_to_use))
  cl <- makeCluster(number_of_cores_to_use) 
  clusterExport(cl=cl, varlist=c('ns','weights'))
  registerDoParallel(cl)
  cat('...Starting foreach initialization')

  output <- foreach(i=1:length(data[,1]), .combine=rbind) %dopar% {
    cat(i)
    y = data[i,5]
    a = 100
    for (i in 1:3) { # Useless busy work
      b=matrix(runif(a*a), nrow = a, ncol=a)
    }
    return(runif(10))

  }
  # stop cluster
  cat('...Stop cluster')
  stopCluster(cl)

  return(output)
}

r = 100000
c = 10
data = matrix(runif(r*c), nrow = r, ncol=c)
output = doWork(data)
output[1:10,]

The CPU usage is as follows (100% means all cores are fully utilized):

enter image description here

with annotations:

enter image description here

How can I optimize the code so that foreach doesn't take a long time before and after multithreadedly going over the iterations? The main time sink is the time spent after. The time spent after grows significantly with the number of foreach iterations, sometimes making the code has slow as if a simple for loop was used.


Another example (let's assume lm and poly cannot take matrices as arguments):

library(foreach)
library(doParallel)

doWork <- function(data,weights) {

  # setup parallel backend to use many processors
  cores=detectCores()
  number_of_cores_to_use = cores[1]-1 # not to overload the computer
  cat(paste('number_of_cores_to_use:',number_of_cores_to_use))
  cl <- makeCluster(number_of_cores_to_use) 
  clusterExport(cl=cl, varlist=c('weights'))
  registerDoParallel(cl)
  cat('...Starting foreach initialization')

  output <- foreach(i=1:nrow(data), .combine=rbind) %dopar% {
    x = sort(data[i,])
    fit = lm(x[1:(length(x)-1)] ~ poly(x[-1], degree = 2,raw=TRUE), na.action=na.omit, weights=weights)
    return(fit$coef)
  }
  # stop cluster
  cat('...Stop cluster')
  stopCluster(cl)

  return(output)
}

r = 10000 
c = 10
weights=runif(c-1)
data = matrix(runif(r*c), nrow = r, ncol=c)
output = doWork(data,weights)
output[1:10,]
Franck Dernoncourt
  • 77,520
  • 72
  • 342
  • 501
  • 5
    The problem is with `rbind` I think. `rbind` lots of values from a list takes a long time. Also, fillings rows is bad, because matrices are stored by column. Also, making a long foreach loop is not efficient (use blocks instead). Finally, using shared memory is always better when parallelizing over matrices. I could make a solution for you if you make an example that is nearer from what you want. – F. Privé Jul 19 '17 at 06:52
  • 1
    The problem is in thinking that `foreach %dopar%` is *always* faster than a vectorized approach. `foreach %dopar%` requires communicating among `100,000` instances of workers, since you are iterating through `i=1:length(data[,1])` and `data` has 100,000 rows. You should benchmark between a vectorized approach (sapply, lapply, apply) and a parallelized approach before implementing a parallelized approach. If you want to stick with a parallelized approach, change your code to work on columns (of which there are 10) rather than rows. Better to start fewer workers with more work on each. – CPak Jul 19 '17 at 13:00
  • @F.Privé Thanks, I would be very interested. I have added an example that is nearer from what I want. – Franck Dernoncourt Jul 19 '17 at 15:40
  • `x = sort(data[i,])` and `x[1:(length(x)-1)]` seem odd. Can you confirm this is what you want to do? – F. Privé Jul 19 '17 at 15:53
  • @F.Privé confirmed. Thanks for your great answer! – Franck Dernoncourt Jul 19 '17 at 19:06
  • @FranckDernoncourt If you are happy with the answer, please validate it. – F. Privé Jul 25 '17 at 17:20
  • @F.Privé Good point, thanks! (I had already upvoted) – Franck Dernoncourt Jul 25 '17 at 18:34

2 Answers2

2

Try this:

devtools::install_github("privefl/bigstatsr")
library(bigstatsr)
options(bigstatsr.ncores.max = parallel::detectCores())

doWork2 <- function(data, weights, ncores = parallel::detectCores() - 1) {

  big_parallelize(data, p.FUN = function(X.desc, ind, weights) {

    X <- bigstatsr::attach.BM(X.desc)

    output.part <- matrix(0, 3, length(ind))
    for (i in seq_along(ind)) {
      x <- sort(X[, ind[i]])
      fit <- lm(x[1:(length(x)-1)] ~ poly(x[-1], degree = 2, raw = TRUE), 
               na.action = na.omit, weights = weights)
      output.part[, i] <- fit$coef
    }

    t(output.part)
  }, p.combine = "rbind", ncores = ncores, weights = weights)
}

system.time({
  data.bm <- as.big.matrix(t(data))
  output2 <- doWork2(data.bm, weights)
})

all.equal(output, output2, check.attributes = FALSE)

This is twice as fast on my computer (which has only 4 cores). Remarks:

  • Using more than half of the cores is often useless.
  • Your data is not very large, so using a big.matrix may not be useful here.
  • big_parallelize separate the matrix in ncores blocks of columns and apply your function on each and then combine the results.
  • In the function, it's better to make the output before the loop, and then fill it than to use a foreach that rbind all the results.
  • I'm accessing only columns, not rows.

So all these are good practices, yet it is not really relevant for your data. The gain should be higher when using more cores and for larger datasets.

Basically, if you want to be super fast, reimplementing the lm part in Rcpp would be a good solution.

F. Privé
  • 11,423
  • 2
  • 27
  • 78
1

As F. Privé mentioned in the comment:

The problem is with rbind I think. rbind lots of values from a list takes a long time. Also, fillings rows is bad, because matrices are stored by column. Also, making a long foreach loop is not efficient (use blocks instead).

To use use blocks instead (if 5 cores are used, each core receives 20% of the matrix):

library(foreach)
library(doParallel)


array_split <- function(data, number_of_chunks) {
  # [Partition matrix into N equally-sized chunks with R](https://stackoverflow.com/a/45198299/395857)
  # Author: lmo
  rowIdx <- seq_len(nrow(data))
  lapply(split(rowIdx, cut(rowIdx, pretty(rowIdx, number_of_chunks))), function(x) data[x, ])
}


doWork <- function(data) {

  # setup parallel backend to use many processors
  cores=detectCores()
  number_of_cores_to_use = cores[1]-1 # not to overload the computer
  cat(paste('number_of_cores_to_use:',number_of_cores_to_use))
  cl <- makeCluster(number_of_cores_to_use) 
  clusterExport(cl=cl, varlist=c('ns','weights'))
  registerDoParallel(cl)

  cat('...Starting array split')
  number_of_chunks = number_of_cores_to_use
  data_chunks = array_split(data=data, number_of_chunks=number_of_chunks)
  degree_poly = 2

  cat('...Starting foreach initialization')
  output <- foreach(i=1:length(data_chunks), .combine=rbind) %dopar% {

    data_temporary = data_chunks[[i]]
    output_temporary = matrix(0, nrow=nrow(data_temporary), ncol = degree_poly + 1)
    for(i in 1:length(data_temporary[,1])) {
      x = sort(data_temporary[i,])
      fit = lm(x[1:(length(x)-1)] ~ poly(x[-1], degree = degree_poly,raw=TRUE), na.action=na.omit, weights=weights)
      output_temporary[i,] = fit$coef
    }
    return(output_temporary)
  }

  # stop cluster
  cat('...Stop cluster')
  stopCluster(cl)

  return(output)
}

r = 100000
c = 10
weights=runif(c-1)
data = matrix(runif(r*c), nrow = r, ncol=c)
output = doWork(data)
output[1:10,]

FYI:

Franck Dernoncourt
  • 77,520
  • 72
  • 342
  • 501