0

I was learning to use pthread with hopes it will help some of the slowest pieces of my code go a bit faster. I tried to (as a warm-up example) to write a Montecarlo integrator using threads. I wrote a code that compares three approaches:

  • Single thread pthread evaluation of the integral with NEVALS integrand evaluations.
  • Multiple thread evaluation of the integral NTHREADS times each with NEVALS integrand evaluations.
  • Multiple threads commited to different cores in my CPU, again totalling NEVALS*NTHREADS integrand evaluations.

Upon running the fastest per integrand evaluations is the single core, between 2 and 3 times faster than the others. The other two seem to be somewhat equivalent except for the fact that the CPU usage is very different, the second one spreads the threads across all the (8) cores in my CPU, while the third (unsurprisingly) concentrates the job in NTHREADS and leaves the rest unoccupied.

Here is the source:

#include <iostream>
#define __USE_GNU
#include <sched.h>
#include <pthread.h>
#include <thread>
#include <stdlib.h>
#include <math.h>
#include <time.h>
#include <unistd.h>

using namespace std;

double aleatorio(double a, double b){
    double r = double(rand())/RAND_MAX;
    return a + r * (b - a);

}

double funct(double* a){
    return pow(a[0],6);
}

void EstimateBounds(int ndim, double (*f)(double*), double* bounds){
    double x[ndim];
    for(int i=1;i<=1000;i++){
        for(int j=0;j<ndim;j++) x[j] = aleatorio(0,1);
        if ( f(x) > bounds[1]) bounds[1] = f(x);
        if ( f(x) < bounds[0]) bounds[0] = f(x);
    }
}

void Integrate(double (*f)(double*), int ndim, double* integral, int verbose, int seed){

    int nbatch = 5000000;
    const int maxeval = 25*nbatch;
    double x[ndim];
    srand(seed);




    /// Algorithm to estimate the maxima and minima ///
    for(int j=0;j<ndim;j++) x[j] = 0.5;
    double bounds[2] = {f(x),f(x)};
    EstimateBounds(ndim,f,bounds);

    /// Integral initialization ///
    int niter = int(maxeval/nbatch);


    for(int k=1;k<=niter;k++)
    {


        double loc_min = bounds[0];
        double loc_max = bounds[1];

        int count = 0;
        for (int i=1; i<=nbatch; i++)
        {
            for(int j=0;j<ndim;j++) x[j] = aleatorio(0,1);
            double y = aleatorio(bounds[0],bounds[1]);
            if ( f(x) > loc_max )   loc_max = f(x);
            if ( f(x) < loc_min )   loc_min = f(x);
            if ( f(x) > y && y > 0 ) count++;
            if ( f(x) < y && y < 0 ) count--;

        }

        double delta = (bounds[1]-bounds[0])*double(count)/nbatch;
        integral[0]  += delta;
        integral[1] += pow(delta,2);
        bounds[0] = loc_min;
        bounds[1] = loc_max;

        if(verbose>0){
        cout << "Iteration["<<k<<"]: " << k*nbatch;
        cout << " integrand evaluations so far" <<endl;
        if(verbose>1){
            cout << "The bounds for this iteration were = ["<<bounds[0]<<","<<bounds[1]<<"]"<<endl;}
        cout << "Integral = ";
        cout << integral[0]/k << " +- "; 
        cout << sqrt((integral[1]/k - pow(integral[0]/k,2)))/(k) << endl;
        cout << endl;
        }

    }
    integral[0] /= niter;
    integral[1] = sqrt((integral[1]/niter - pow(integral[0],2)))/niter;

}

struct IntegratorArguments{

    double (*Integrand)(double*);
    int NumberOfVariables;
    double* Integral;
    int VerboseLevel;
    int Seed;

};

void LayeredIntegrate(IntegratorArguments IA){
    Integrate(IA.Integrand,IA.NumberOfVariables,IA.Integral,IA.VerboseLevel,IA.Seed);
}

void ThreadIntegrate(void * IntArgs){
    IntegratorArguments *IA = (IntegratorArguments*)IntArgs;
    LayeredIntegrate(*IA);
    pthread_exit(NULL);

}

#define NTHREADS 5

int main(void)
{

   cout.precision(16);
   bool execute_single_core = true;
   bool execute_multi_core = true;
   bool execute_multi_core_2 = true;

   ///////////////////////////////////////////////////////////////////////////
   ///
   ///          Single Thread Execution 
   ///
   ///////////////////////////////////////////////////////////////////////////

    if(execute_single_core){
    pthread_t thr0;
    double integral_value0[2] = {0,0};
    IntegratorArguments IntArg0;
    IntArg0.Integrand = funct;
    IntArg0.NumberOfVariables = 2;
    IntArg0.VerboseLevel = 0;
    IntArg0.Seed = 1;

    IntArg0.Integral = integral_value0;
    int t = time(NULL);
    cout << "Now Attempting to create thread "<<0<<endl;
    int rc0 = 0;
    rc0 = pthread_create(&thr0, NULL, ThreadIntegrate,&IntArg0);   
    if (rc0) {
        cout << "Error:unable to create thread," << rc0 << endl;
        exit(-1);
    }
    else cout << "Thread "<<0<<" has been succesfuly created" << endl;
    pthread_join(thr0,NULL);
    cout << "Thread 0 has finished, it took " << time(NULL)-t <<" secs to finish"  << endl;
    cout << "Integral Value = "<< integral_value0[0] << "+/-" << integral_value0[1] <<endl;
    }


    ////////////////////////////////////////////////////////////////////////////////
    ///
    ///             Multiple Threads Creation 
    ///
    /////////////////////////////////////////////////////////////////////////////// 

    if(execute_multi_core){

    pthread_t threads[NTHREADS];
    double integral_value[NTHREADS][2];
    IntegratorArguments IntArgs[NTHREADS];
    int rc[NTHREADS];
    for(int i=0;i<NTHREADS;i++){
        integral_value[i][0]=0;
        integral_value[i][1]=0;
        IntArgs[i].Integrand = funct;
        IntArgs[i].NumberOfVariables = 2;
        IntArgs[i].VerboseLevel = 0;
        IntArgs[i].Seed = i;
        IntArgs[i].Integral = integral_value[i];        
    }

    int t = time(NULL);
    for(int i=0;i<NTHREADS;i++){
        cout << "Now Attempting to create thread "<<i<<endl;
        rc[i] = pthread_create(&threads[i], NULL, ThreadIntegrate,&IntArgs[i]);
        if (rc[i]) {
            cout << "Error:unable to create thread," << rc[i] << endl;
            exit(-1);
        }
        else cout << "Thread "<<i<<" has been succesfuly created" << endl;
    }
    /// Thread Waiting Phase ///
    for(int i=0;i<NTHREADS;i++) pthread_join(threads[i],NULL);
    cout << "All threads have now finished" <<endl;
    cout << "This took " << time(NULL)-t << " secs to finish" <<endl;
    cout << "Or " << (time(NULL)-t)/NTHREADS << " secs per core" <<endl;
    for(int i = 0; i < NTHREADS; i++ ) {
        cout << "Thread " << i << " has as the value for the integral" << endl;
        cout << "Integral = ";
        cout << integral_value[i][0] << " +- "; 
        cout << integral_value[i][1] << endl;
    }

    }

    ////////////////////////////////////////////////////////////////////////
    ///
    ///             Multiple Cores Execution 
    ///
    ///////////////////////////////////////////////////////////////////////


    if(execute_multi_core_2){

    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);

    pthread_t threads[NTHREADS];
    double integral_value[NTHREADS][2];
    IntegratorArguments IntArgs[NTHREADS];
    int rc[NTHREADS];
    for(int i=0;i<NTHREADS;i++){
        integral_value[i][0]=0;
        integral_value[i][1]=0;
        IntArgs[i].Integrand = funct;
        IntArgs[i].NumberOfVariables = 2;
        IntArgs[i].VerboseLevel = 0;
        IntArgs[i].Seed = i;
        IntArgs[i].Integral = integral_value[i];        
    }

    int t = time(NULL);
    for(int i=0;i<NTHREADS;i++){
        cout << "Now Attempting to create thread "<<i<<endl;
        rc[i] = pthread_create(&threads[i], NULL, ThreadIntegrate,&IntArgs[i]);
        if (rc[i]) {
            cout << "Error:unable to create thread," << rc[i] << endl;
            exit(-1);
        }
        else cout << "Thread "<<i<<" has been succesfuly created" << endl;
        CPU_SET(i, &cpuset);
    }

    cout << "Now attempting to commit different threads to different cores" << endl;
    for(int i=0;i<NTHREADS;i++){
        const int set_result = pthread_setaffinity_np(threads[i], sizeof(cpu_set_t), &cpuset);
        if(set_result) cout << "Error: Thread "<<i<<" could not be commited to a new core"<<endl;
        else cout << "Thread reassignment succesful" << endl;
    }

    /// Thread Waiting Phase ///
    for(int i=0;i<NTHREADS;i++) pthread_join(threads[i],NULL);
    cout << "All threads have now finished" <<endl;
    cout << "This took " << time(NULL)-t << " secs to finish" <<endl;
    cout << "Or " << (time(NULL)-t)/NTHREADS << " secs per core" <<endl;
    for(int i = 0; i < NTHREADS; i++ ) {
        cout << "Thread " << i << " has as the value for the integral" << endl;
        cout << "Integral = ";
        cout << integral_value[i][0] << " +- "; 
        cout << integral_value[i][1] << endl;
    }

    }



pthread_exit(NULL);

}

I compile with g++ -std=c++11 -w -fpermissive -O3 SOURCE.cpp -lpthread

It seems to me that my threads are actually being excecuted sequentially, because the time seems to grow with NTHREADS, and it actully takes roughly NTHREADS times longer than a single thread.

Does anyone have an idea of where the bottleneck is?

1201ProgramAlarm
  • 32,384
  • 7
  • 42
  • 56
  • `rand` is not thread safe, but I don't know if that is the source of the bottleneck. Also, you only need to call `f(x)` once for any given `x`, rather the the 4 or 5 times you're calling it now. – 1201ProgramAlarm Dec 19 '19 at 02:23
  • Any shared data is going to run slower on cores with different cache. – stark Dec 19 '19 at 02:26
  • So, you have a task that does X work, that takes Y to run. And, from what I see, this is compared to starting N threads that each do X work, and you're wondering why overall it takes longer to do X*N amount of work than X, the overall time is longer than Y? Is that an accurate capsule summary, why it takes longer to do X N times, then to do X once, even though those N times are done by N threads? – Sam Varshavchik Dec 19 '19 at 02:57
  • 1
    `-w -fpermissive` is pretty much the opposite of safe flags to use. It inhibits any diagnostic for dangerous things that are not C++ standard-conform. For example your program is using the wrong return type for `ThreadIntegrate` in order to pass it to `pthread_create` and your program therefore has undefined behavior under the standard. Whether or not it will work is now up to the compiler and the pthreads implementation. Please also consider using `std::thread` (clearly you have C++11 available) which wraps pthreads in a proper C++ API. There is no good reason to use pthreads directly. – walnut Dec 19 '19 at 02:58
  • Also, try commenting out the writes to `cout` from the threads, and see if that has any effects. – Chris O Dec 20 '19 at 14:01

1 Answers1

1

You are using rand(), which is a global random number generator. First of all it is not thread-safe, so using it in multiple threads, potentially in parallel, causes undefined behavior.

Even if we set that aside, rand() is using one global instance, shared by all threads. If one thread wants to call it, the processor core needs to check whether the other cores modified its state and needs to refetch that state from the main memory or other caches each time it is used. This is why you observe the drop in performance.

Use the <random> facilities for pseudo-random number generators instead. They offer much better quality random number generators, random number distributions, and the ability to create multiple independent random number generator instances. Make these thread_local, so the threads do not interfere with one another:

double aleatorio(double a, double b){
    thread_local std::mt19937 rng{/*seed*/};
    return std::uniform_real_distribution<double>{a, b}(rng);
}

Please note though that this is not using proper seeding for std::mt19937, see this question for details and that uniform_real_distribution<double>{a, b} will return a uniformly distributed number between a inclusive and b exclusive. Your original code gave a number between a and b inclusive (potential rounding errors aside). I assume that neither is particularly relevant to you.

Also note my unrelated comments under your question for other things you should improve.

walnut
  • 21,629
  • 4
  • 23
  • 59
  • Thanks @walnut, this solves my issue, the post has a typo and the second line should read: ```thread_local std::mt19937 rng{std::random_device{}()};``` – Diógenes Figueroa Dec 19 '19 at 14:37
  • The -fpermissive was me being 100% lazy, by changing ThreadIntegrate to return a void pointer (and actually return it) I can remove that flag, thanks! – Diógenes Figueroa Dec 19 '19 at 14:58