I use foreach
+ doParallel
to apply a function to each row of a matrix multithreadedly in R. When the matrix has many rows, foreach
takes a long time before and after multithreadedly going over the iterations.
For example, if I run:
library(foreach)
library(doParallel)
doWork <- function(data) {
# setup parallel backend to use many processors
cores=detectCores()
number_of_cores_to_use = cores[1]-1 # not to overload the computer
cat(paste('number_of_cores_to_use:',number_of_cores_to_use))
cl <- makeCluster(number_of_cores_to_use)
clusterExport(cl=cl, varlist=c('ns','weights'))
registerDoParallel(cl)
cat('...Starting foreach initialization')
output <- foreach(i=1:length(data[,1]), .combine=rbind) %dopar% {
cat(i)
y = data[i,5]
a = 100
for (i in 1:3) { # Useless busy work
b=matrix(runif(a*a), nrow = a, ncol=a)
}
return(runif(10))
}
# stop cluster
cat('...Stop cluster')
stopCluster(cl)
return(output)
}
r = 100000
c = 10
data = matrix(runif(r*c), nrow = r, ncol=c)
output = doWork(data)
output[1:10,]
The CPU usage is as follows (100% means all cores are fully utilized):
with annotations:
How can I optimize the code so that foreach
doesn't take a long time before and after multithreadedly going over the iterations? The main time sink is the time spent after. The time spent after grows significantly with the number of foreach iterations, sometimes making the code has slow as if a simple for loop was used.
Another example (let's assume lm
and poly
cannot take matrices as arguments):
library(foreach)
library(doParallel)
doWork <- function(data,weights) {
# setup parallel backend to use many processors
cores=detectCores()
number_of_cores_to_use = cores[1]-1 # not to overload the computer
cat(paste('number_of_cores_to_use:',number_of_cores_to_use))
cl <- makeCluster(number_of_cores_to_use)
clusterExport(cl=cl, varlist=c('weights'))
registerDoParallel(cl)
cat('...Starting foreach initialization')
output <- foreach(i=1:nrow(data), .combine=rbind) %dopar% {
x = sort(data[i,])
fit = lm(x[1:(length(x)-1)] ~ poly(x[-1], degree = 2,raw=TRUE), na.action=na.omit, weights=weights)
return(fit$coef)
}
# stop cluster
cat('...Stop cluster')
stopCluster(cl)
return(output)
}
r = 10000
c = 10
weights=runif(c-1)
data = matrix(runif(r*c), nrow = r, ncol=c)
output = doWork(data,weights)
output[1:10,]