I have developed a dual chain markov monte carlo model designed to forecast loan portfolios in the excellent package Rcpp but have run into an issue trying to implement a parallelised version of these functions with RcppParallel.
I have based my attempts this far on this vignette (https://gallery.rcpp.org/articles/parallel-distance-matrix/) and this stackoverflow thread (How to call user-defined function in RcppParallel?).
All of the UDFs underlying this logic are implemented using Armadillo type objects, which I understand are threadsafe, and the writing of data between functions and pre-allocated outputs should be working smoothly as I have this same logic implemented successfully in serial functions. It's also true that the function portfolio_simulation_rating_model_rs_ts works well with the inputs used outside of the RcppParallel wrapper and there are no compilation errors or warnings when I source this code and the underlying functions. However, once I get to running the dcmcmc_portfolio_rating_model_parallel function in R, my session crashes only saying that there has been a fatal error.
Clearly I am missing something in the parallelisation, so any help/suggestions would be greatly appreciated.
// [[Rcpp::depends(RcppArmadillo, RcppParallel)]]
#include <string>
#include <algorithm>
#include <vector>
#include <math.h>
#include <RcppArmadillo.h>
#include <RcppParallel.h>
using namespace arma;
using namespace RcppParallel;
using namespace Rcpp;
using namespace std;
struct dcmcmc_portfolio_rating_model_worker : public Worker {
// Input Values
const int n_loans ;
const int n_regime ;
const int n_matrix ;
const int n_amort ;
const RVector<double> loan_ids ;
const RVector<double> starting_balances ;
const RVector<double> starting_positions ;
const RVector<double> cprs ;
const RVector<double> sim_regime_indices ;
const RVector<double> loan_regime_indices ;
const RVector<double> starting_periods ;
const RVector<double> regime_matrix_indices ;
const RVector<double> matrix_indices ;
const RVector<double> matrix_elements ;
const int nrow ;
const int ncol ;
const RMatrix<double> amortisation_schedules ;
const int periods ;
const int iterations ;
// Output Matrix
RMatrix<double> output_mx ;
dcmcmc_portfolio_rating_model_worker(
const int& n_loans,
const int& n_regime,
const int& n_matrix,
const int& n_amort,
const NumericVector& loan_ids,
const NumericVector& starting_balances,
const NumericVector& starting_positions,
const NumericVector& cprs,
const NumericVector& sim_regime_indices,
const NumericVector& loan_regime_indices,
const NumericVector& starting_periods,
const NumericVector& regime_matrix_indices,
const NumericVector& matrix_indices,
const NumericVector& matrix_elements,
const int& nrow,
const int& ncol,
const NumericMatrix& amortisation_schedules,
const int& periods,
const int& iterations,
NumericMatrix& output_mx)
: n_loans(n_loans),
n_regime(n_regime),
n_matrix(n_matrix),
n_amort(n_amort),
loan_ids(loan_ids),
starting_balances(starting_balances),
starting_positions(starting_positions),
cprs(cprs),
sim_regime_indices(sim_regime_indices),
loan_regime_indices(loan_regime_indices),
starting_periods(starting_periods),
regime_matrix_indices(regime_matrix_indices),
matrix_indices(matrix_indices),
matrix_elements(matrix_elements),
nrow(nrow),
ncol(ncol),
amortisation_schedules(amortisation_schedules),
periods(periods),
iterations(iterations),
output_mx(output_mx) {}
// Setting up functions to convert inputs to arma
arma::vec convert_input_vector(RVector<double> input_vector, int length)
{RVector<double> tmp_input_vector = input_vector ;
arma::vec input_vector_ts(tmp_input_vector.begin(), length, false) ;
return input_vector_ts ;}
arma::mat convert_input_matrix(RMatrix<double> input_matrix, int rows, int cols)
{RMatrix<double> tmp_input_matrix = input_matrix ;
arma::mat input_matrix_ts(tmp_input_matrix.begin(), rows, cols, false) ;
return input_matrix_ts ;}
// Function to iterate
void operator()(std::size_t begin, std::size_t end){
arma::vec loan_ids_ts = convert_input_vector(loan_ids, n_loans) ;
arma::vec starting_balances_ts = convert_input_vector(starting_balances, n_loans) ;
arma::vec starting_positions_ts = convert_input_vector(starting_positions, n_loans) ;
arma::vec cprs_ts = convert_input_vector(cprs, n_loans) ;
arma::vec sim_regime_indices_ts = convert_input_vector(sim_regime_indices, n_regime);
arma::vec loan_regime_indices_ts = convert_input_vector(loan_regime_indices, n_regime) ;
arma::vec starting_periods_ts = convert_input_vector(starting_periods, n_regime) ;
arma::vec regime_matrix_indices_ts = convert_input_vector(regime_matrix_indices, n_regime);
arma::vec matrix_indices_ts = convert_input_vector(matrix_indices, n_matrix) ;
arma::vec matrix_elements_ts = convert_input_vector(matrix_elements, n_matrix) ;
arma::mat amortisation_schedules_ts = convert_input_matrix(amortisation_schedules, n_amort, 3) ;
for(unsigned int i = begin; i < end; i++){
arma::vec i_sim_regime_indices = allwhich_ts(sim_regime_indices_ts,
i) ;
int sim_begin = as_scalar(i_sim_regime_indices.head(1)) ;
int sim_end = as_scalar(i_sim_regime_indices.tail(1)) ;
arma::vec i_loan_regime_indices = loan_regime_indices_ts.subvec(sim_begin, sim_end) ;
arma::vec i_starting_periods = starting_periods_ts.subvec(sim_begin, sim_end) ;
arma::vec i_regime_matrix_indices = regime_matrix_indices_ts.subvec(sim_begin, sim_end) ;
arma::mat pf_simulation = portfolio_simulation_rating_model_rs_ts(
loan_ids_ts,
starting_balances_ts,
starting_positions_ts,
cprs_ts,
i_loan_regime_indices,
i_starting_periods,
i_regime_matrix_indices,
matrix_indices_ts,
matrix_elements_ts,
nrow,
ncol,
amortisation_schedules_ts,
periods
) ;
int sim_rows = pf_simulation.n_rows ;
int sim_cols = pf_simulation.n_cols ;
for(int c = 0; c < sim_cols; c++){
for(int r = 0; r < sim_rows; r++){
output_mx((n_loans*periods*i + r), c) = pf_simulation(r, c) ;
}
}
for(int r = 0; r < sim_rows; r++){
output_mx((n_loans*periods*i + r), 7) = (i + 1) ;
}
}
}
};
//[[Rcpp::export]]
NumericMatrix dcmcmc_portfolio_rating_model_parallel(
const NumericVector& loan_ids,
const NumericVector& starting_balances,
const NumericVector& starting_positions,
const NumericVector& cprs,
const NumericVector& sim_regime_indices,
const NumericVector& loan_regime_indices,
const NumericVector& starting_periods,
const NumericVector& regime_matrix_indices,
const NumericVector& matrix_indices,
const NumericVector& matrix_elements,
int nrow,
int ncol,
const NumericMatrix& amortisation_schedules,
int periods,
int iterations
){
int n_loans = loan_ids.size() ;
int n_regime = sim_regime_indices.size() ;
int n_matrix = matrix_indices.size() ;
int n_amort = amortisation_schedules.nrow() ;
NumericMatrix output_mx(n_loans*periods*iterations, 8) ;
// Creating Worker object
dcmcmc_portfolio_rating_model_worker DCMCMC(
n_loans,
n_regime,
n_matrix,
n_amort,
loan_ids,
starting_balances,
starting_positions,
cprs,
sim_regime_indices,
loan_regime_indices,
starting_periods,
regime_matrix_indices,
matrix_indices,
matrix_elements,
nrow,
ncol,
amortisation_schedules,
periods,
iterations,
output_mx
) ;
// Call parellised worker
parallelFor(0, iterations, DCMCMC) ;
return(output_mx) ;
}
EDIT:
I have produced a minimum reproducible example, trying to incorporate the helpful comments recieved on this post so far. The example sets up trivial functions designed to mimic the structure of my modelling functions. The final function causing a crash takes three vectors, vec1, vec2, and vec_ind. It applies a worker which attempts to take chunks of equal size (indentified by indices stored in vec_ind) of vec1 and vec2, add these subvector chunks, and store the results in the relevant portions of an output vector.
I have reproduced the example below using both arma::vec and std::vector types and experience the crashing behaviour in both. I present the std::vector code below, further to Dirk's suggestion that the RcppArmadillo types may be relying on R memory, and I have removed all namespace inclusions other than RcppParallel to avoid conflicts, as per onyambu's remark.
Here is the Rcpp
// [[Rcpp::depends(RcppArmadillo, RcppParallel)]]
#include <string>
#include <algorithm>
#include <vector>
#include <math.h>
#include <RcppArmadillo.h>
#include <RcppParallel.h>
using namespace RcppParallel;
//[[Rcpp::export]]
std::vector<double> allwhich_ts(std::vector<double> vector, double value){
int length = vector.size() ;
std::vector<double> values(0) ;
for(int i = 0; i < length; i++){
bool match = vector[i] == value;
if(match){
values.push_back(i);
}
}
return(values);
}
//[[Rcpp::export]]
std::vector<double> vector_addition(std::vector<double> vector1, std::vector<double> vector2){
int n_elements = vector1.size() ;
std::vector<double> output_vec = std::vector<double>(n_elements) ;
for(int i = 0; i < n_elements; i++){
output_vec[i] = vector1[i] + vector2[i] ;
}
return(output_vec) ;
}
struct vector_addition_worker : public Worker {
const RVector<double> vector1 ;
const RVector<double> vector2 ;
const RVector<double> vector_indices ;
const int vector_length ;
RVector<double> output_vec ;
vector_addition_worker(
const Rcpp::NumericVector& vector1,
const Rcpp::NumericVector& vector2,
const Rcpp::NumericVector& vector_indices,
const int& vector_length,
Rcpp::NumericVector& output_vec
) : vector1(vector1),
vector2(vector2),
vector_indices(vector_indices),
vector_length(vector_length),
output_vec(output_vec) {}
std::vector<double> convert_input_vec(RVector<double> input_vector, int vec_length){
RVector<double> tmp_vector = input_vector ;
std::vector<double> input_vector_ts(tmp_vector.begin(), tmp_vector.end()) ;
return(input_vector_ts) ;
}
void operator()(std::size_t begin, std::size_t end){
std::vector<double> vector1_ts = convert_input_vec(vector1, vector_length) ;
std::vector<double> vector2_ts = convert_input_vec(vector2, vector_length) ;
std::vector<double> vector_indices_ts = convert_input_vec(vector_indices, vector_length) ;
for(unsigned int i = begin; i < end; i++){
std::vector<double> indices = allwhich_ts(vector_indices_ts, i) ;
int values_begin = indices.at(1) ;
int values_end = indices.at(std::distance(indices.begin(), indices.end())) ;
std::vector<double> values1(vector1_ts.begin() + values_begin, vector1_ts.begin() + values_end) ;
std::vector<double> values2(vector2_ts.begin() + values_begin, vector2_ts.begin() + values_end) ;
std::vector<double> interim_op = vector_addition(values1, values2) ;
int op_size = interim_op.size() ;
for(int n = 0; n < op_size; n++){
output_vec[i*op_size + n] = interim_op[n] ;
}
}
}
};
//[[Rcpp::export]]
Rcpp::NumericVector vector_addition_parallel(Rcpp::NumericVector vec1,
Rcpp::NumericVector vec2,
Rcpp::NumericVector vec_ind){
int vec_length = vec1.size() ;
double n_indices = *std::max_element(vec_ind.begin(), vec_ind.end()) ;
Rcpp::NumericVector op_vec(vec_length);
vector_addition_worker vec_add_worker(
vec1,
vec2,
vec_ind,
vec_length,
op_vec
) ;
parallelFor(0, n_indices, vec_add_worker) ;
return(op_vec) ;
}
Here is the R code which tests for expected behaviour
library(Rcpp)
library(RcppParallel)
library(RcppArmadillo)
# Setting up dummy data
vec1 = rep(1, 500)
vec2 = rep(1, 500)
vec_inds = sort(rep(1:20, 25))
length(vec1);length(vec2);length(vec_inds)
## Checking that allwhich_ts is working as expected
allwhich_ts(vec_inds, 1)
# Checking that vector_addition is working as expected
vector_addition(vec1, vec2)
# Checking that the same logic can be applied serially (mainly to verify data handling method)
r_repro_function <- function(vec1, vec2, vec_inds){
op_vec = numeric(length(vec1))
for(i in unique(vec_inds)){
tmp1 = vec1[vec_inds == i]
tmp2 = vec2[vec_inds == i]
tmp_op = tmp1 + tmp2
for(n in 1:length(tmp1)){
op_vec[(i - 1)*length(tmp1) + n] = tmp_op[n]
}
}
op_vec
}
r_repro_function(vec1, vec2, vec_inds)
vector_addition_parallel(vec1 = vec1,
vec2 = vec2,
vec_ind = vec_inds)