4

I am having some problems with using Rcpp (and Rcpp Armadillo) with the parallel package, and I am getting incorrect results depending on the number of cores I use for computation.

I have a function compute_indices which computes 3 set of indices for each observation in my data. It does this by first creating a (FORK) cluster using parallel::makeCluster depending on the number of cores I specify. It then splits my data into equal parts and applies (using parallel::parLapply) a function meancorlsm on each part using the cluster object I created earlier. Now meancorlsm is basically a wrapper around a function (called covArmadilloMclsmNormal) I wrote in Rcpp and Rcpp Armadillo as I am trying to speed up computation. However, I have another version of this function written entirely in R (called meancorlsR) which I use to test correctness of the RccpArmadillo version.

Now if I run compute_indices with the meancorlsm (this I do by first using sourceCpp() to make covArmadilloMclsmNormal available in the global environment), I get partially correct answers depending on the number of cores I tell compute_indices to use. Specifically, if I use 4 cores, the first 1/4 of the computed indices are correct, if I use 2 cores, the first 1/2 of my results are correct, and if I use a single core, all my results are correct. I check the correctness of the results using the answer produced by using the R version of the meancorlsm (meancorlsR as stated earlier). Since I am getting correct results when I use a single core, I feel the RcppArmadillo function is correct and that possibly, the different threads/workers of the cluster are interfering with each other during the computation, hence the reason I get this strange behaviour.

Below is compute_indices:

compute_indices <- function(dt, nr = ncol(dt), n_core = 4){ 
  par_cluster <- makeCluster(n_core, type = "FORK")
  # compute the column splits/partition for parallel processing
  splits <- parallel:::splitList(1:nr, n_core)
  # compute auxiliar support data
  data_means <- colMeans(dt, na.rm = T)
  data_vars <- apply(dt, MARGIN =  2, var)
  data_sds <- apply(dt, 2, sd)
  # compute Outliers using parapply
  vectors <- do.call(rbind, parLapply(par_cluster, splits,
                                      meancorlsm, dt, data_means,
                                      data_vars, data_sds))
  stopCluster(par_cluster)
  vectors
}

and meancorlsm

meancorlsm<- function(i, mtx, means, vars, sds){
  pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i], dt = mtx,
                                      dtmeans = means, dtvars = vars,
                                      dtsds = sds)
  colnames(pre_outl) <- c("sh", "mg", "ap")
  pre_outl
}

with the covArmadilloMclsmNormal Rcpp function:


#include <RcppArmadillo.h>
using namespace Rcpp;


//[[Rcpp::depends(RcppArmadillo)]]
// [[Rcpp::export]]

arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt,
                     arma::vec dtmeans, arma::vec dtvars,
                     arma::vec dtsds){
  arma::mat out(dt.n_cols, dti.n_cols);
  out = arma::cov(dt, dti);
  int n = out.n_rows;
  int p = out.n_cols;
  //declare matrices to hold result
  arma::vec temp(n);
  arma::mat preout(3,p);
  for(int i = 0; i<p ; ++i){
    temp = out.col(i)/dtvars;
    preout(0,i) = arma::mean((out.col(i)/dtsds))/dtsds(i); 
    preout(1, i) = dtmeans(i) - arma::mean(temp % dtmeans);
    preout(2, i) = arma::mean(temp); 
  }

  return preout.t();
}

Now here is the R version of meancorlsm which I use for testing:

meancorlsR <- function(i, mtx, means, vars, sds){
  pre_outl <- apply(cov(mtx, mtx[,i], use = "pairwise.complete.obs"), 2,
                    function(col){
                      tmp <- col/vars
                      c("sh" = mean(col/sds, na.rm = T), 
                        "mg" = mean(tmp * means, na.rm = T), 
                        "ap" = mean(tmp, na.rm = T)) 
                    })
  pre_outl[1,] <- pre_outl[1,]/sds[i] 
  pre_outl[2,] <- means[i] - pre_outl[2,] 
  t(pre_outl)
}

You can replace meancorlsm function with meancorlsR in the compute_indices function and it will work (for testing). For immediate reproducibility however, I provide it here as compute_indicesR.

compute_indicesR <- function(dt, nr = ncol(dt), n_core = 4){ 
  par_cluster <- makeCluster(n_core, type = "FORK")
  # compute the column splits/partition for parallel processing
  splits <- parallel:::splitList(1:nr, n_core)
  # compute auxiliar support data
  data_means <- colMeans(dt, na.rm = T)
  data_vars <- apply(dt, MARGIN =  2, var)
  data_sds <- apply(dt, 2, sd)
  # compute using parapply
  vectors <- do.call(rbind, parLapply(par_cluster, splits,
                                      meancorlsR, dt, data_means, 
                                      data_vars, data_sds))
  stopCluster(par_cluster)
  vectors
}

Finally here is a minimal example to run:

library(Rcpp)
library(parallel)
# use this to source the Rcpp function from a file
# to make the covArmadilloMclsmNormal function available
sourceCpp("covArmadilloMclsmNormal.cpp")

data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row 

indices_rcpp4 <- compute_indices(dt) # using 4 cores
indices_rcpp2 <- compute_indices(dt, n_core = 2) # using 2 cores
indices_rcpp1 <- compute_indices(dt, n_core = 1) # 1 core
# using the R version
# already replaced the meancorlsm function to meancorlsR here  
indices_R <- compute_indicesR(dt) # R version

I expect all the output to match the one produced by the R version regardless of the number of cores I specify. However, they are different. Here is the result I get with the R version:

"           sh               mg               ap 
1   0.634272567307155 -7.09315427645087   0.992492531586726
2   0.868144125333511  22.3206363514708   0.622504642756242
3   0.819231480417289  24.8027625928423   0.756088388472384
4   0.830462006956641 -6.26663378557611   1.03847748215856
5   0.836182582923674  15.0558414918816   0.901413435882058
6   0.648813304451793  23.4689784056255   0.40175151333289
7   0.839409670144446  3.73900558549848   0.883655665107456
8   0.781070895796188  13.1775081516076   0.810306856575333
9   0.772967959938828  2.24023877077873   1.1146249477264
10  0.826849986442202  3.31330282673472   0.910527502047015"

The result I got with the Rcpp version using 2 cores is

"           sh               mg               ap 
1   0.634272567307155 -7.09315427645086   0.992492531586726
2   0.868144125333511  22.3206363514708   0.622504642756242
3   0.819231480417289  24.8027625928424   0.756088388472384
4   0.830462006956641 -6.26663378557612   1.03847748215856
5   0.836182582923674  15.0558414918816   0.901413435882058
6   0.231943043232274  28.1832641199112   0.40175151333289
7   1.20839881621289   7.02471987121276   0.883655665107456
8   0.865212462148289  21.7489367230362   0.810306856575333
9   0.853048693647409  -10.474046943507   1.1146249477264
10  0.857055188335614  14.599017112449    0.910527502047015"

while for the Rccp version using 4 cores:

"           sh               mg               ap 
1   0.634272567307155 -7.09315427645086   0.992492531586726
2   0.868144125333511  22.3206363514708   0.622504642756242
3   0.819231480417289  24.8027625928424   0.756088388472384
4   0.648794650804865 -10.2666337855761   1.03847748215856
5   1.25119408317776   5.48441292045304   0.901413435882058
6   0.231943043232274  28.1832641199112   0.40175151333289
7   1.20839881621289   7.02471987121276   0.883655665107456
8   0.487272877566209  3.60607958017906   0.810306856575333
9   1.50139103326263  -6.75976122922128   1.1146249477264
10  1.01076542369015   15.4561599695919   0.910527502047015"

The Rccp version using a single core produced the same answer as the R version which is the correct result. Also interesting is that the ap column of the answer remained the same no matter the number of cores I used while the sh and the mg column changes.

Finally, my platform is Ubuntu 16.04. Seems FORK clusters doesn't work on Windows so you might not be able to reproduce this result. However, I got the same behaviour even with PSOCK cluster (in which case I used clusterEvalQ() to source the necessary Cpp functions to make them available to the workers). Any help or insight as to what I am doing wrong is much appreciated.

Segun Ojo
  • 91
  • 7
  • 2
    Hi Segun -- that is a long question but in general be aware that parallel work is best done on independent data chunks as R is very clearly not multithread safe. Maybe take a look at the RcppParallel documentation and its examples -- and their use of `RMatrix` and `RVector`. – Dirk Eddelbuettel Jul 13 '19 at 20:47
  • I don't have all the math in my head right now, but based on `arma::cov`'s documentation, shouldn't you be splitting by row? So using `nr = nrow(dt)` and then `mtx[i,]`. – Alexis Jul 13 '19 at 21:14
  • Probably `mtx[i, , drop = FALSE]` to be sure. – Alexis Jul 13 '19 at 21:21
  • Hi Alexis, the whole function is written in such a way that the data is in a long format, i.e I transposed the data before parsing into my function. Hence the rows of the data are actually gotten by the referencing the columns in the function. – Segun Ojo Jul 13 '19 at 23:51
  • @DirkEddelbuettel Thanks I will look into RcppParallel. Do you think maybe if I built a package things could work better? Maybe the fact that I am using sourceCpp to feed the worker could be a problem. – Segun Ojo Jul 14 '19 at 00:02

1 Answers1

2

Nevermind my comments, I misinterpreted Armadillo's documentation.

Your C++ code is indexing the helper dtmeans and dtsds vectors with i, but i always starts at zero for each parallel instance, so you need to pass an offset indicating how many columns were skipped:

//[[Rcpp::depends(RcppArmadillo)]]

#include <RcppArmadillo.h>

//[[Rcpp::export]]
arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt, int offset,
                                  arma::vec dtmeans, arma::vec dtvars, arma::vec dtsds)
{
  arma::mat out = arma::cov(dt, dti);
  size_t p = out.n_cols;

  arma::mat preout(p,3);
  for (int i = 0; i < p; ++i) {
    arma::vec temp = out.col(i) / dtvars;
    preout(i,0) = arma::mean((out.col(i) / dtsds)) / dtsds(i + offset); 
    preout(i,1) = dtmeans(i + offset) - arma::mean(temp % dtmeans);
    preout(i,2) = arma::mean(temp); 
  }

  return preout;
}

So accordingly:

meancorlsm <- function(i, mtx, means, vars, sds){
  pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i, drop = FALSE], dt = mtx, offset = min(i) - 1L,
                                      dtmeans = means, dtvars = vars,
                                      dtsds = sds)
  colnames(pre_outl) <- c("sh", "mg", "ap")
  pre_outl
}

You can even corroborate it sequentially:

data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row 

# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:ncol(dt), 2)

# compute auxiliary support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN =  2, var)
data_sds <- apply(dt, 2, sd)

do.call(rbind, lapply(splits,
                      meancorlsm, dt, data_means,
                      data_vars, data_sds))
             sh        mg        ap
 [1,] 0.6342726 -7.093154 0.9924925
 [2,] 0.8681441 22.320636 0.6225046
 [3,] 0.8192315 24.802763 0.7560884
 [4,] 0.8304620 -6.266634 1.0384775
 [5,] 0.8361826 15.055841 0.9014134
 [6,] 0.6488133 23.468978 0.4017515
 [7,] 0.8394097  3.739006 0.8836557
 [8,] 0.7810709 13.177508 0.8103069
 [9,] 0.7729680  2.240239 1.1146249
[10,] 0.8268500  3.313303 0.9105275

BTW, I think pre-allocating matrices in the C++ code is of no use if you'll simply overwrite them with =.

Alexis
  • 4,950
  • 1
  • 18
  • 37
  • Thanks a bunch for this. Can you please clarify the last suggestion about preallocating matrices. Sorry but this is my first time using Rcpp and armadillo so I am not familiar about memory management. How can I make it more efficient? where I am preallocating a matrix that is not necessary. Thanks – Segun Ojo Jul 14 '19 at 20:32
  • @SegunOjo Like when you first define `arma::mat out(dt.n_cols, dti.n_cols)` but immediately overwrite it with `out = arma::cov(dt, dti)`; the former allocates memory that is never used, because `out` ends up holding the memory allocated by `arma::cov` internally. It's more of a C++ thing, not `Rcpp`; search for copy and move semantics. – Alexis Jul 14 '19 at 22:16