3

I would like to parallelize a for loop using parallelReduce of RcppParallel but struggle with joining the results because the dimension of the parallel output for each iteration is not known a priori. With the foreach package in R, this can be done with foreach using .combine=rbind:

library(foreach)
library(doParallel)

# some dummy function which, as a function of the iteration index, returns a Boolean
lengthy_test <- function(row_of_data){
  if(runif(1)<0.5){
    return(TRUE)
  }else{
    return(FALSE)
  }
}

input <- matrix(NA,1000,1)

output = foreach(i=1:1000,.combine=rbind)%dopar%{
  if(lengthy_test(input[i,])){
    n_rows_i = 1            # n_rows_i is the number of rows that we would like to add
  }else{
    n_rows_i = 2
  }
  matrix(rnorm(n_rows_i*3),nrow =n_rows_i, ncol = 3)
}

How can one achieve the same with parallelReduce? The key problem is to write the Split constructor and the join operator since a priori it is unknown which row of the input matrix is associated with which row(s) of the output matrix.

In my use case, the dimensions of output is known a priori so that the preallocation of an output matrix (or construction of a Worker without Split) is no problem.

A serial implementation in C++ is not a problem since one can simply carry around a counter of how many rows of the output matrix one has filled already. How this can be done in parallel is not obvious to me.

Edit

Since F. Privé asked: Here is how I'd try to adapt the code from the RcppParallel Documentation

// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;

struct Test : public Worker
{   
   // source matrix
   const RMatrix<double> input;

   // number of rows of output matrix
   int nrows;   

   // output matrix

   RMatrix<double> output;
   
   // constructors
   Test(const NumericMatrix input) : input(input), nrows(input.nrow()*2) output(nrows, 3) {}
   Test(const Test& test, Split) : input(test.input) {
     nrows = 0;
     for (int i = 0; i < input.nrow(); i++) {
       if(Rcpp::runif(1)<0.5){
         nrows = nrows + 1;
       }else{
         nrows = nrows + 2;
       }
       // now I know the dimension I have to assign to output
       // i.e. nrows rows and 3 columns 
       // but I don't know _which_ rows I have to assign
       // this is why I thought that parallelReduce might be better than parallelFor 
       // since I think I can just do this and worry about the joining later    

       output = Rmatrix(Rcpp::rnorm(nrows*3),nrows,3);
   }
   
   // accumulate just the element of the range I've been asked to
   void operator()(std::size_t begin, std::size_t end) {
      // perhaps I can leave this empty 
      // since all the work is done in the split constructor
      // but I'm not sure that the split constructor is always called
   }
     
   // join my output with that of another Test
   void join(const Test& rhs) { 
      //  here the rbind equivalent is needed
      // the order of binding is not imporant for me (maybe for others, it is)

      output = rbind(output,rhs.output); 
   }
};

And then, one could call it like this

// [[Rcpp::export]]
double Testcombine(NumericMatrix x){
   
   // declare the Test instance 
   Test test(x);
   
   // call parallel_reduce to start the work
   parallelReduce(0, x.nrow(), test);
   
   // return the computed test
   return test.output;
}
Smelton
  • 31
  • 3
  • Please provide what you tried as a {RcppParallel} implementation. Also, how can you know the numbers of rows a priori even though it seems to be random? – F. Privé Dec 16 '21 at 07:45
  • @F.Privé: I've added an attempt at a RcppParallel implementation. The number of rows in total is known in my application. What I have posted here is intended to be a MWE where the number of rows in each iteration is not known. Unfortunately, the MWE comes with the added difficulty that the aggregate number of rows is also unkown (but it should be close to `1.5 * nrow(input)`). – Smelton Dec 16 '21 at 15:14

0 Answers0