I am having some problems with using Rcpp (and Rcpp Armadillo) with the parallel package, and I am getting incorrect results depending on the number of cores I use for computation.
I have a function compute_indices
which computes 3 set of indices for each observation in my data. It does this by first creating a (FORK) cluster using parallel::makeCluster
depending on the number of cores I specify. It then splits my data into equal parts and applies (using parallel::parLapply
) a function meancorlsm
on each part using the cluster object I created earlier. Now meancorlsm
is basically a wrapper around a function (called covArmadilloMclsmNormal
) I wrote in Rcpp and Rcpp Armadillo as I am trying to speed up computation. However, I have another version of this function written entirely in R (called meancorlsR
) which I use to test correctness of the RccpArmadillo version.
Now if I run compute_indices
with the meancorlsm
(this I do by first using sourceCpp()
to make covArmadilloMclsmNormal
available in the global environment), I get partially correct answers depending on the number of cores I tell compute_indices
to use. Specifically, if I use 4 cores, the first 1/4 of the computed indices are correct, if I use 2 cores, the first 1/2 of my results are correct, and if I use a single core, all my results are correct. I check the correctness of the results using the answer produced by using the R version of the meancorlsm
(meancorlsR
as stated earlier). Since I am getting correct results when I use a single core, I feel the RcppArmadillo function is correct and that possibly, the different threads/workers of the cluster are interfering with each other during the computation, hence the reason I get this strange behaviour.
Below is compute_indices
:
compute_indices <- function(dt, nr = ncol(dt), n_core = 4){
par_cluster <- makeCluster(n_core, type = "FORK")
# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:nr, n_core)
# compute auxiliar support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN = 2, var)
data_sds <- apply(dt, 2, sd)
# compute Outliers using parapply
vectors <- do.call(rbind, parLapply(par_cluster, splits,
meancorlsm, dt, data_means,
data_vars, data_sds))
stopCluster(par_cluster)
vectors
}
and meancorlsm
meancorlsm<- function(i, mtx, means, vars, sds){
pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i], dt = mtx,
dtmeans = means, dtvars = vars,
dtsds = sds)
colnames(pre_outl) <- c("sh", "mg", "ap")
pre_outl
}
with the covArmadilloMclsmNormal
Rcpp function:
#include <RcppArmadillo.h>
using namespace Rcpp;
//[[Rcpp::depends(RcppArmadillo)]]
// [[Rcpp::export]]
arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt,
arma::vec dtmeans, arma::vec dtvars,
arma::vec dtsds){
arma::mat out(dt.n_cols, dti.n_cols);
out = arma::cov(dt, dti);
int n = out.n_rows;
int p = out.n_cols;
//declare matrices to hold result
arma::vec temp(n);
arma::mat preout(3,p);
for(int i = 0; i<p ; ++i){
temp = out.col(i)/dtvars;
preout(0,i) = arma::mean((out.col(i)/dtsds))/dtsds(i);
preout(1, i) = dtmeans(i) - arma::mean(temp % dtmeans);
preout(2, i) = arma::mean(temp);
}
return preout.t();
}
Now here is the R version of meancorlsm
which I use for testing:
meancorlsR <- function(i, mtx, means, vars, sds){
pre_outl <- apply(cov(mtx, mtx[,i], use = "pairwise.complete.obs"), 2,
function(col){
tmp <- col/vars
c("sh" = mean(col/sds, na.rm = T),
"mg" = mean(tmp * means, na.rm = T),
"ap" = mean(tmp, na.rm = T))
})
pre_outl[1,] <- pre_outl[1,]/sds[i]
pre_outl[2,] <- means[i] - pre_outl[2,]
t(pre_outl)
}
You can replace meancorlsm
function with meancorlsR
in the compute_indices
function and it will work (for testing). For immediate reproducibility however, I provide it here as compute_indicesR
.
compute_indicesR <- function(dt, nr = ncol(dt), n_core = 4){
par_cluster <- makeCluster(n_core, type = "FORK")
# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:nr, n_core)
# compute auxiliar support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN = 2, var)
data_sds <- apply(dt, 2, sd)
# compute using parapply
vectors <- do.call(rbind, parLapply(par_cluster, splits,
meancorlsR, dt, data_means,
data_vars, data_sds))
stopCluster(par_cluster)
vectors
}
Finally here is a minimal example to run:
library(Rcpp)
library(parallel)
# use this to source the Rcpp function from a file
# to make the covArmadilloMclsmNormal function available
sourceCpp("covArmadilloMclsmNormal.cpp")
data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row
indices_rcpp4 <- compute_indices(dt) # using 4 cores
indices_rcpp2 <- compute_indices(dt, n_core = 2) # using 2 cores
indices_rcpp1 <- compute_indices(dt, n_core = 1) # 1 core
# using the R version
# already replaced the meancorlsm function to meancorlsR here
indices_R <- compute_indicesR(dt) # R version
I expect all the output to match the one produced by the R version regardless of the number of cores I specify. However, they are different. Here is the result I get with the R version:
" sh mg ap
1 0.634272567307155 -7.09315427645087 0.992492531586726
2 0.868144125333511 22.3206363514708 0.622504642756242
3 0.819231480417289 24.8027625928423 0.756088388472384
4 0.830462006956641 -6.26663378557611 1.03847748215856
5 0.836182582923674 15.0558414918816 0.901413435882058
6 0.648813304451793 23.4689784056255 0.40175151333289
7 0.839409670144446 3.73900558549848 0.883655665107456
8 0.781070895796188 13.1775081516076 0.810306856575333
9 0.772967959938828 2.24023877077873 1.1146249477264
10 0.826849986442202 3.31330282673472 0.910527502047015"
The result I got with the Rcpp version using 2 cores is
" sh mg ap
1 0.634272567307155 -7.09315427645086 0.992492531586726
2 0.868144125333511 22.3206363514708 0.622504642756242
3 0.819231480417289 24.8027625928424 0.756088388472384
4 0.830462006956641 -6.26663378557612 1.03847748215856
5 0.836182582923674 15.0558414918816 0.901413435882058
6 0.231943043232274 28.1832641199112 0.40175151333289
7 1.20839881621289 7.02471987121276 0.883655665107456
8 0.865212462148289 21.7489367230362 0.810306856575333
9 0.853048693647409 -10.474046943507 1.1146249477264
10 0.857055188335614 14.599017112449 0.910527502047015"
while for the Rccp version using 4 cores:
" sh mg ap
1 0.634272567307155 -7.09315427645086 0.992492531586726
2 0.868144125333511 22.3206363514708 0.622504642756242
3 0.819231480417289 24.8027625928424 0.756088388472384
4 0.648794650804865 -10.2666337855761 1.03847748215856
5 1.25119408317776 5.48441292045304 0.901413435882058
6 0.231943043232274 28.1832641199112 0.40175151333289
7 1.20839881621289 7.02471987121276 0.883655665107456
8 0.487272877566209 3.60607958017906 0.810306856575333
9 1.50139103326263 -6.75976122922128 1.1146249477264
10 1.01076542369015 15.4561599695919 0.910527502047015"
The Rccp version using a single core produced the same answer as the R version which is the correct result. Also interesting is that the ap
column of the answer remained the same no matter the number of cores I used while the sh
and the mg
column changes.
Finally, my platform is Ubuntu 16.04. Seems FORK
clusters doesn't work on Windows so you might not be able to reproduce this result. However, I got the same behaviour even with PSOCK
cluster (in which case I used clusterEvalQ()
to source the necessary Cpp functions to make them available to the workers). Any help or insight as to what I am doing wrong is much appreciated.