2

I am trying to implement a function in Rcpp that takes a matrix as input and calculates and quantiles as specified by the user for the row of said matrix. Since I want to use openMP I tried to do it using RcppEigen due to thread safety concerns. One reason this looks a bit complicated is that for calculating quantiles efficiently I tried to mimic this approach (finding quartiles, first answer), but allow for user input. So essentially I create a vector with indices corresponding to the quantiles in the first step. In the second step I try to acces the corresponding values in the for loop.

This is the code I was trying:

// // -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; indent-tabs-mode: nil; -*-

// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::plugins(openmp)]]

#ifdef _OPENMP
#include <omp.h>
#endif

// [[Rcpp::plugins(cpp11)]]
#include <random>

// [[Rcpp::export]]
SEXP summaryParC(const Eigen::MatrixXd x,
                 const Eigen::VectorXd quantiles,
                 int nrow, int ncol, const int ncores)
{
  const int no_quantiles = quantiles.size();
  Eigen::MatrixXd result(nrow, no_quantiles);

  // this part is just to give me a vector of indices I need later on in the foor loop
  //-----------------------------------------------
  Eigen::VectorXi indices(no_quantiles +1);
  indices[0] = -1;
  for (int k=0; k<no_quantiles; k++){
    if (quantiles[k] < 0.5){
      indices[k+1] = floor(quantiles[k] * (ncol-1));
    } else {
      indices[k+1] = ceil(quantiles[k] * (ncol-1));
    }
  }
  //-----------------------------------------------

#pragma omp parallel num_threads(ncores)
{
#pragma omp for
  for(int i = 0; i < nrow; i++){
    // I am trying to convert it into a vector so I can sort it
    Eigen::VectorXd v = (x.row(i));
    auto * ptr = v; // this fails
    // here I want to use the pointer to access the n-th element of the vector
    for(int q=0; q<no_quantiles; q++){ //quantiles
      std::nth_element(ptr + indices[q] + 1, ptr + indices[q+1], ptr + ncol);
      result(i,q) = *(ptr + indices[q+1]);
    }
  }
}
return Rcpp::wrap(result);
}

The reason that I wanted to define my own pointer is that Eigen::VectorXd v has nothing like v.begin(). without openMP I would simply define x as NumericMatrix and v as NumericVector and everything works fine. Using openMP I can not rely on that being thread-safe?

This works for smaller datasets, but crashes when used on a larger matrix:

// [[Rcpp::export]]
SEXP summaryC(NumericMatrix x,
                 NumericVector quantiles, 
                 int nrow, int ncol, const int ncores)
{
  const int no_quantiles = quantiles.size();
  NumericMatrix result(nrow, no_quantiles);
  int indices[no_quantiles +1];
  //-----------------------------------------------
  indices[0] = -1;
  for (int k=0; k<no_quantiles; k++){
    if (quantiles[k] < 0.5){
      indices[k+1] = floor(quantiles[k] * (ncol-1));
    } else {
      indices[k+1] = ceil(quantiles[k] * (ncol-1));
    }
  }
  //-----------------------------------------------
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
  for(int i = 0; i < nrow; i++){
    // converting it into a vector so I can sort it
    NumericVector v = (x.row(i));
    for(int q=0; q<no_quantiles; q++){ //quantiles
      std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
      result(i,q) = *(v.begin() + indices[q+1]);
    }
  }
}
  return Rcpp::wrap(result);
}

Thank you very much!

Update:

I implemented Ralf Stubner's approach. The Pointer works fine as far as I can tell. (Unfortunately R still aborts the session when I try to run it. As Dirk Eddelbuettel pointed out using a pointer does not solve the problem of accessing R memory).

// [[Rcpp::export]]
SEXP summaryParC(Eigen::MatrixXd x,
                 const Eigen::VectorXd quantiles,
                 int nrow, int ncol, const int ncores)
{
  const int no_quantiles = quantiles.size();
  Eigen::MatrixXd result(nrow, no_quantiles);
  Eigen::VectorXi indices(no_quantiles +1);
  indices[0] = -1;
  for (int k=0; k<no_quantiles; k++){
    if (quantiles[k] < 0.5){
      indices[k+1] = floor(quantiles[k] * (ncol-1));
    } else {
      indices[k+1] = ceil(quantiles[k] * (ncol-1));
    }
  }

#pragma omp parallel num_threads(ncores)
{
#pragma omp for
  for(int i = 0; i < nrow; i++){
    Eigen::VectorXd v = (x.row(i));
    double * B = v.data();
    double * E = B + nrow;

    for(int q=0; q<no_quantiles; q++){ //quantiles
      std::nth_element(B + indices[q] + 1, B + indices[q+1], E);
      result(i,q) = *(B + indices[q+1]);
    }
  }
}
return Rcpp::wrap(result);
}

2nd update: here a cleaner example of the underlying problem. I am aware of the fact that using R structures is problematic with openMP, but maybe the example can lead to a better understanding of the underlying reasons.

// [[Rcpp::plugins(openmp)]]
// [[Rcpp::plugins(cpp11)]]
#include <Rcpp.h>
#ifdef _OPENMP
#include <omp.h>
#endif

using namespace Rcpp;

// [[Rcpp::export]]
SEXP summaryC(NumericMatrix x,
              int nrow, int ncol, const int ncores)
{
  NumericMatrix result(nrow, 5);
  int indices[6] = {-1, 0,  249,  500,  750, 999};

  //   #pragma omp parallel num_threads(ncores)
  {
    //     #pragma omp for
    for(int i = 0; i < nrow; i++){
      NumericVector v = (x.row(i));
      for(int q=0; q < 5; q++){
        std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
        result(i,q) = *(v.begin() + indices[q+1]);
      }
    }
  }
  return Rcpp::wrap(result);
}





// [[Rcpp::export]]
SEXP summaryParC(NumericMatrix x,
                 int nrow, int ncol, const int ncores)
{
  NumericMatrix result(nrow, 5);
  int indices[6] = {-1, 0,  249,  500,  750, 999};

  #pragma omp parallel num_threads(ncores)
  {
    #pragma omp for schedule(dynamic)
      for(int i = 0; i < nrow; i++){
      {
        NumericVector v = (x.row(i));
        for(int q=0; q<5; q++){
          std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
          result(i,q) = *(v.begin() + indices[q+1]);
        }
      }
      }
  }
return Rcpp::wrap(result);
}





// [[Rcpp::export]]
SEXP summaryParCorder(NumericMatrix x,
                 int nrow, int ncol, const int ncores)
{
  NumericMatrix result(nrow, 5);
  int indices[6] = {-1, 0,  249,  500,  750, 999};

  #pragma omp parallel num_threads(ncores)
  {
    #pragma omp for ordered schedule(dynamic)
    for(int i = 0; i < nrow; i++){
      #pragma omp ordered
      {
        NumericVector v = (x.row(i));
        for(int q=0; q<5; q++){
          std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
          result(i,q) = *(v.begin() + indices[q+1]);
        }
      }
    }
  }
return Rcpp::wrap(result);
}




***** R - code *****
#this works, but summaryParCorder is much slower. 
mbm <- microbenchmark::microbenchmark(
  summaryC(x = matrix(as.numeric(1:1000000), ncol = 1000), 
           nrow = 1000, ncol = 1000, ncores = 4),

  summaryParCorder(x = matrix(as.numeric(1:1000000), ncol = 1000), 
              nrow = 1000, ncol = 1000, ncores = 4),
  times = 20
)
mbm

# this breaks:
summaryParC(x = matrix(as.numeric(1:1000000), ncol = 1000), 
                 nrow = 1000, ncol = 1000, ncores = 4)
Nikos Bosse
  • 144
  • 11
  • How would you use that pointer? Can you reduce the code to just that parts that fail? The code might look stupid, but that's ok if you explain the background. BTW, if you are not using the linear algebra functions from Eigen, then the data structures from `RcppParallel` are very useful. – Ralf Stubner Aug 25 '18 at 09:46
  • I deleted some unnecessary lines (sorry for that) and tried to structure and explain it more clearly and also added an example of what I am trying to accomplish with the pointer. (if you think it would be better to delete the intialization of the quantile vector I can delete that, I was hoping to achieve more clarity with that). If possible I would like to avoid RcppParallel, as it is another package to import in my package, but if not this is probably good advice – Nikos Bosse Aug 25 '18 at 11:12
  • Have you tried `v.data()` for getting a raw pointer? – Ralf Stubner Aug 25 '18 at 12:47
  • 1
    Are you sure it is aborting because of threading issues? After all, you are accessing `indices[q + 1]` where `q` is only bounded by `q < indices.length()`. – Ralf Stubner Aug 25 '18 at 21:07
  • not entirely (is there a way to find that out?). Ah oh damn.Thank you I'm just realising that I made a typo creating the post. I originally had Eigen::VectorXi indices(no_quantiles + 1). I tried to correct it but it still crashes (I will check for more mistakes / typos). – Nikos Bosse Aug 25 '18 at 21:38
  • In that case I suggest you build a [mcve]. In this case this means one file that one can load with `sourceCpp`. The file should also contain the necessary R code for calling the function, typically with some random data, c.f. the code in my answer. – Ralf Stubner Aug 25 '18 at 21:41
  • I updated the code and added an example. Should I rather leave that here or post it in another question? Thank you very much. – Nikos Bosse Aug 26 '18 at 11:40

1 Answers1

1

I have not checked for compatibility with OpenMP, but Eigen::VectorXd::data() gives you the required pointer, if the vector in question is not const:

// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>

// [[Rcpp::export]]
Eigen::VectorXd quantiles(Eigen::VectorXd x, const Eigen::VectorXi& indices) {
  Eigen::VectorXd result(indices.size());

  std::nth_element(x.data(), x.data() + indices[0], x.data() + x.size());
  result(0) = x[indices[0]];

  for (int i = 1; i < indices.size(); ++i) {
    std::nth_element(x.data() + indices[i - 1] + 1,
                     x.data() + indices[i],
                     x.data() + x.size());
    result(i) = x[indices[i]];
  }
  return result;
}

/*** R
set.seed(42)
x <- runif(12)
i <- sort(sample(seq_len(12), 3)) - 1
quantiles(x, i)
*/

Here a full solution including OpenMP:

// [[Rcpp::plugins(openmp)]]
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>

using namespace Rcpp;

// [[Rcpp::export]]
NumericMatrix summaryC(NumericMatrix x, int nrow, int ncores)
{
  NumericMatrix result(nrow, 5);
  int indices[6] = {-1, 0,  249,  500,  750, 999};

  for (int i = 0; i < nrow; i++) {
    NumericVector v = (x.row(i));
    for (int q = 0; q < 5; ++q) {
      std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
      result(i,q) = *(v.begin() + indices[q+1]);
    }
  }
  return result;
}

// [[Rcpp::export]]
Eigen::MatrixXd summaryParC(Eigen::MatrixXd x,int nrow, int ncores) {
  Eigen::MatrixXd result(nrow, 5);
  int indices[6] = {-1, 0,  249,  500,  750, 999};

  #pragma omp parallel num_threads(ncores)
  {
    #pragma omp for schedule(dynamic)
      for (int i = 0; i < nrow; i++) {
        Eigen::VectorXd v = x.row(i);
        for (int q = 0; q < 5; ++q) {
          std::nth_element(v.data() + indices[q] + 1,
               v.data() + indices[q+1],
               v.data() + v.size());
          result(i,q) = v[indices[q+1]];
        }
      }
  }
  return result;
}

/*** R 
x <- matrix(as.numeric(1:1000000), ncol = 1000)
microbenchmark::microbenchmark(
   summaryC = summaryC(x = x, nrow = 1000, ncores = 4),
  summaryParC = summaryParC(x = x, nrow = 1000, ncores = 4),
  times = 100)
*/

I have never seen a crash with this parallel version. And on my dual-core machine it is about 44% percent faster than the serial code.

Ralf Stubner
  • 26,263
  • 3
  • 40
  • 75
  • 1
    I fear what is being ignored here is that the pointer _may still point to R-held memory_ as RcppEigen also uses a proxy model. Accessing the Matrix elements by pointer makes no difference. Don't risk accessing R data from OpenMP. – Dirk Eddelbuettel Aug 25 '18 at 19:57
  • That was my mistake and where my assumption was wrong, thank you for pointing that out. – Nikos Bosse Aug 25 '18 at 20:48
  • @DirkEddelbuettel I am confused. I thought this memory sharing only happens for `Eigen::Map`. At least the above function does not alter the content of `x`. That changes if I use `Eigen::Map` in the function signature. But even if it provided access to R-held memory; isn't that exactly what `RcppParallel::RVector` is doing? Is it problematic to use that together with OpenMP? – Ralf Stubner Aug 25 '18 at 21:02
  • I could well be wrong. Map<> for sure, maybe this one copies. There is still no reason to use a pointer as indices would work too. But what do I care. OP hasn't exactly demonstrated via profiling where his bottleneck is ... We do have memory profiling tools and could in fact settle this. But oh well. – Dirk Eddelbuettel Aug 25 '18 at 21:08
  • @DirkEddelbuettel this one copies out of R. Agreed on the pointer remark. – coatless Aug 26 '18 at 00:14
  • Could you maybe point me to a good memory profiling tool I could use? I didn't know what profiling to do, when the code itself just makes R abort. – Nikos Bosse Aug 26 '18 at 11:41
  • 1
    @NikosBosse I would go for valgrind, c.f. http://kevinushey.github.io/blog/2015/04/05/debugging-with-valgrind/. BTW, why are you using `Rcpp` data structures? In an update to my answer I have altered your parallel code to use those from `Eigen`. No crash so far ... – Ralf Stubner Aug 26 '18 at 14:58
  • Unfortunately I'm running windows, but I will try it out as soon as I can. I had inferred from Dirk's comment that both would behave similarly and thus used Rcpp to make the example more comparable. Thank you very much!! – Nikos Bosse Aug 26 '18 at 18:00
  • @RalfStubner you are wonderful, thank you! I played a little around with it and it seems to work fine. – Nikos Bosse Aug 27 '18 at 07:43