1

I've built a sequential, multi-threaded, and multi-process (MPI) version of a Monte Carlo calculation to compare parallel programming techniques. Comparing the sequential code to the MPI code produces the expected results. For a very large number of samples, the MPI code runs about 5 times faster with 5 processes performing calculations. However, I can't get the multi-threaded version to run faster, even though the system monitor shows multiple cores working on the calculation. I am running the code on Linux. Except for MPI in the multi-process version, I don't use any external libraries.

What would cause the multi-threaded version to take effectively the same amount of time in this case, even though the calculations are uniformly distributed to threads that have been assigned to different cores? I've made everything possible local to the threaded functions to hopefully eliminate false sharing, but I saw no change compared to using global variables for everything.

Sequential version:

#include "common/common.hpp"


// Integral to evaluate
#define v(x)    exp(x)


using namespace std;

int main(int argc, char **argv)
{
    // Limits of integration
    const double a = 0.0, b = 1.0;

    // Read number of samples strata from command-line input
    uint nSamples = atoi(argv[1]);
    uint nStrata = atoi(argv[2]);

    srand((int)time(0));

    // Sums in each stratum
    vector<uint> nSamples_s(nStrata, 0);
    vector<double> sumX_s(nStrata, 0.0), sumX2_s(nStrata, 0.0);

    double x, delta = (b-a)/nStrata;
    uint s;

    double mean, var;

    for (uint i = 1; i <= nSamples; i++) {
        // Sample random variable
        x = a + (b-a)*((double)rand() / RAND_MAX);

        // Select the matching stratum
        s = nStrata*(x-a)/(b-a);
        s = (s == nStrata) ? nStrata - 1 : s;

        // Store sums
        nSamples_s[s]++;
        sumX_s[s] += delta*v(x);
        sumX2_s[s] += pow(delta*v(x), 2.0);
    }

    // Calculate summary statistics
    mean = 0.0;
    var = 0.0;
    for (uint j = 0; j < nStrata; j++) {
        mean += sumX_s[j]/nSamples_s[j];
        var += sumX2_s[j]/nSamples_s[j] - pow(sumX_s[j]/nSamples_s[j], 2.0);
    }

    // Output summary statistics
    cout << "\nIntegral estimate: " << mean
         << "\n\tstddev = " << sqrt(var)
         << "\n\tstderr = " << sqrt(var/nSamples) << endl;

    return 0;
}

Multi-threaded version:

#include "common/common.hpp"
#include <thread>
#include <mutex>


using namespace std;

// Mutex for modifying summary statistics
mutex mtx;

// Integral to evaluate
#define v(x)    exp(x)

double mean = 0.0, var = 0.0;


void partialSum(uint rank, uint numWorkers, double a, double b, uint nStrata, uint nSamples);


int main(int argc, char **argv)
{
    // Limits of integration
    const double a = 0.0, b = 1.0;

    // Read number of samples and strata from command-line input
    uint nSamples = atoi(argv[1]);
    uint nStrata = atoi(argv[2]);

    srand((int)time(0));

    // Worker threads
    const uint numWorkers = 5;
    vector<thread> workers;

    // Start threads
    for (uint t = 0; t < numWorkers; t++)
        workers.push_back(thread(partialSum, t, numWorkers, a, b, nStrata, nSamples));

    // Wait for thread execution
    for (uint t = 0; t < numWorkers; t++)
        workers[t].join();

    // Output summary statistics
    cout << "\nIntegral estimate: " << mean
         << "\n\tstddev = " << sqrt(var)
         << "\n\tstderr = " << sqrt(var/nSamples) << endl;

    return 0;
}

void partialSum(uint rank, uint numWorkers, double a, double b, uint nStrata, uint nSamples)
{
    uint nStrata_t, nSamples_t;     // Actual number of strata and samples handled by this thread
    uint stdStrata_t;               // Nominal number of strata per thread

    nStrata_t = stdStrata_t = nStrata / numWorkers;
    if (rank == numWorkers - 1)
        nStrata_t += nStrata % numWorkers;

    uint strataOffset = rank * stdStrata_t;

    nSamples_t = stdStrata_t * (nSamples / nStrata);
    if (rank == numWorkers - 1)
        nSamples_t += nSamples % nStrata;

    // Summed statistics for each stratum in this thread
    vector<uint> nSamples_st(nStrata_t, 0);
    vector<double> sumX_st(nStrata_t, 0.0), sumX2_st(nStrata_t, 0.0);

    // Width of integration region for each stratum and for this thread
    double delta_s = (b-a)/nStrata;
    double delta_t = delta_s * nStrata_t;

    double x;   // Sampling variable
    uint s;     // Corresponding stratum

    // Sum statistics
    for (uint i = 0; i < nSamples_t; i++) {
        // Sample random variable
        x = delta_t*((double)rand() / RAND_MAX);

        // Select the matching stratum
        s = nStrata_t*x/delta_t;
        s = (s == nStrata_t) ? nStrata_t - 1 : s;

        // Store sums
        nSamples_st[s]++;
        sumX_st[s] += delta_s*v(x + a + strataOffset*delta_s);
        sumX2_st[s] += pow(delta_s*v(x + a + strataOffset*delta_s), 2.0);
    }

    // Calculate summary statistics
    double partialMean = 0.0, partialVar = 0.0;
    for (uint j = 0; j < nStrata_t; j++) {
        partialMean += sumX_st[j]/nSamples_st[j];
        partialVar += sumX2_st[j]/nSamples_st[j] - pow(sumX_st[j]/nSamples_st[j], 2.0);
    }

    // Lock mutex until thread exit
    lock_guard<mutex> lockStats(mtx);

    // Add contributions from this thread to summary statistics
    mean += partialMean;
    var += partialVar;
}

MPI version:

#include "common/common.hpp"
#include <mpi.h>


// Limits of integration
const double a = 0.0, b = 1.0;

// Number of samples and strata
uint nSamples, nStrata;

// MPI process data
int numProcs, numWorkers, procRank;

// Integral to evaluate
#define v(x)    exp(x)


#define MPI_TAG_MEAN    0
#define MPI_TAG_VAR     1

void partialSum();
void collectSums();


using namespace std;

int main(int argc, char **argv)
{
    // MPI setup
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
    MPI_Comm_rank(MPI_COMM_WORLD, &procRank);

    // Number of slave processes
    numWorkers = numProcs - 1;
    assert(numWorkers > 0);

    // Read number of samples and strata from command-line input
    nSamples = atoi(argv[1]);
    nStrata = atoi(argv[2]);

    srand((int)time(0));

    if (!procRank) {    // Process 0
        collectSums();
    } else {            // Worker processes
        partialSum();
    }

    MPI_Finalize();
    return 0;
}

void partialSum()
{
    int stdStrata_p, stdSamples_p;  // Nominal number of strata and samples per process
    int nStrata_p, nSamples_p;      // Actual number of strata and samples handled by this process

    nStrata_p = stdStrata_p = nStrata / numWorkers;
    if (procRank == numWorkers)
        nStrata_p += nStrata % numWorkers;

    int strataOffset = (procRank - 1) * stdStrata_p;

    nSamples_p = stdSamples_p = nStrata_p * (nSamples / nStrata);
    if (procRank == numWorkers)
        nSamples_p += nSamples % nStrata;

    // Sums in each stratum handled by this process
    vector<uint> nSamples_sp(nStrata_p, 0);
    vector<double> sumX_sp(nStrata_p, 0.0), sumX2_sp(nStrata_p, 0.0);

    // Width of integration region for each stratum and this process
    double delta_s = (b-a)/nStrata;
    double delta_p = delta_s*nStrata_p;

    double x;   // Sampling variable
    uint s;     // Corresponding stratum

    // Summed statistics
    double mean, var;

    for (int i = 0; i < nSamples_p; i++) {
        // Sample random variable
        x = delta_p*((double)rand() / RAND_MAX);

        // Select the matching stratum
        s = nStrata_p*x/delta_p;
        s = (s == nStrata_p) ? nStrata_p - 1 : s;

        // Store sums
        nSamples_sp[s]++;
        sumX_sp[s] += delta_s*v(x + a + strataOffset*delta_s);
        sumX2_sp[s] += pow(delta_s*v(x + a + strataOffset*delta_s), 2.0);
    }

    mean = 0.0;
    var = 0.0;
    for (int j = 0; j < nStrata_p; j++) {
        mean += sumX_sp[j]/nSamples_sp[j];
        var += sumX2_sp[j]/nSamples_sp[j] - pow(sumX_sp[j]/nSamples_sp[j], 2.0);
    }

    MPI_Send(&mean, 1, MPI_DOUBLE, 0, MPI_TAG_MEAN, MPI_COMM_WORLD);
    MPI_Send(&var, 1, MPI_DOUBLE, 0, MPI_TAG_VAR, MPI_COMM_WORLD);
}

void collectSums()
{
    double mean = 0.0, var = 0.0;

    for (int i = 0; i < 2*numWorkers; i++) {
        double readBuf;
        MPI_Status readStatus;

        MPI_Recv(&readBuf, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &readStatus);

        if (readStatus.MPI_TAG == MPI_TAG_MEAN)
            mean += readBuf;
        else if (readStatus.MPI_TAG == MPI_TAG_VAR)
            var += readBuf;
    }

    // Output summary statistics
    cout << "\nIntegral estimate: " << mean
         << "\n\tstddev = " << sqrt(var)
         << "\n\tstderr = " << sqrt(var/nSamples) << endl;
}

The programs were compiled and run as such:

$ g++ strat_samples.cpp -o strat_samples -std=gnu++11 -O2 -Wall
$ time ./strat_samples 100000000 100

Integral estimate: 1.71828
    stddev = 0.000515958
    stderr = 5.15958e-08

real    0m18.709s
user    0m18.704s
sys     0m0.000s


$ g++ strat_samples_thd.cpp -o strat_samples_thd -std=gnu++11 -lpthread -O2 -Wall
$ time ./strat_samples_thd 100000000 100
Integral estimate: 1.71828
    stddev = 0.000515951
    stderr = 5.15951e-08

real    0m18.981s
user    0m39.608s
sys     0m44.588s


$ mpic++ strat_samples_mpi.cpp -o strat_samples_mpi -std=gnu++11 -O2 -Wall
$ time mpirun -n 6 ./strat_samples_mpi 100000000 100

Integral estimate: 1.71828
    stddev = 0.000515943
    stderr = 5.15943e-08

real    0m7.601s
user    0m32.912s
sys     0m5.696s

Note: the speed-up of the MPI version is even more significant when you start adding 0's to the command-line input.

RAM
  • 2,257
  • 2
  • 19
  • 41
Bailey C
  • 78
  • 6
  • 2
    Possible duplicate of [Using stdlib's rand() from multiple threads](https://stackoverflow.com/questions/6161322/using-stdlibs-rand-from-multiple-threads) – Zulan Jan 23 '18 at 08:51
  • @Zulan - does Linux not have "per thread" instances of the current seed value used by rand() (which Windows does have)? – rcgldr Jan 23 '18 at 09:09
  • Just a side question but why are you not using openMP? Wouldn't that be easier than manually keeping track of the threads? Or is this part of the learning experience? – NOhs Jan 23 '18 at 09:37
  • @rcgldr I presume glibc uses a lock internally, leading to the commonly observed performance degradation. It seems that POSIX requires a reproducible sequence - one might argue that a hidden threadlocal state violates that when called safely from multiple threads. That said, `rand` is very bad on any platform for many different reasons. – Zulan Jan 23 '18 at 11:23
  • @Zulan - in the case of Windows, each thread's instance of the seed value used by rand() guarantees the same pattern of data for each thread (assuming seed value is left to default or initialized to the same value in all threads). – rcgldr Jan 23 '18 at 14:27
  • That behaviour is not specified though, is it? – Zulan Jan 23 '18 at 16:00
  • Yes, Mr. Z, I know there are APIs that make this easier. The example is for the sake of understanding. As you might imagine, code to integrate e^x from 0 to 1 is unlikely to end up in a larger application. – Bailey C Jan 23 '18 at 18:34

1 Answers1

1

Every pseudo random number generator (PRNG) has a state. In case of rand its hidden, however, its usage in multithreaded codes results in data races and therefore undefined behavior. Moreover, rand has other significant drawbacks as well.

If C++11 is available to you, use its random library part, use one PRNG per thread, use proper distribution, and be careful not to seed PRNGs with same values.

Daniel Langr
  • 22,196
  • 3
  • 50
  • 93