18

I have a program with a function which takes a pointer as arg, and a main. The main is creating n threads, each of them running the function on different memory areas depending on the passed arg. Threads are then joined, the main performs some data mixing between the area and creates n new threads which do the the same operation as the old ones.

To improve the program I would like to keep the threads alive, removing the long time necessary to create them. Threads should sleep when the main is working and notified when they have to come up again. At the same way the main should wait when threads are working as it did with join.

I cannot end up with a strong implementation of this, always falling in a deadlock.

Simple baseline code, any hints about how to modify this would be much appreciated

#include <thread>
#include <climits>

...

void myfunc(void * p) {
  do_something(p);
}

int main(){
  void * myp[n_threads] {a_location, another_location,...};
  std::thread mythread[n_threads];
  for (unsigned long int j=0; j < ULONG_MAX; j++) {
    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i] = std::thread(myfunc, myp[i]);
    }
    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i].join();
    }
    mix_data(myp); 
  }
  return 0;
}
Jonathan Wakely
  • 166,810
  • 27
  • 341
  • 521
DarioP
  • 5,377
  • 1
  • 33
  • 52
  • Where exactly does it deadlock? Is it on `mythread[i].join();`? – Maxim Egorushkin Mar 06 '13 at 16:13
  • No, this is an example of the code I have now which cannot deadlock. I cannot find a better implementation (which does not delete threads joining them) which does not deadlock. – DarioP Mar 06 '13 at 16:17
  • Are you possibly looking for [thread pools](http://stackoverflow.com/questions/3988128/c-thread-pool)? – us2012 Mar 06 '13 at 16:19
  • Pools may be a (not obvious) solution but I think it's an overkill. I am also looking for a very fast execution and simple implementation, probably mutex and conditional_variable should be the first way to try. – DarioP Mar 06 '13 at 16:25
  • There is a working example of [multi-producer multi-consumer queue](http://stackoverflow.com/a/9711916/412080) you can use with a bunch of your own worker threads. – Maxim Egorushkin Mar 06 '13 at 16:28
  • Ok, thanks for the example, but that's pretty far from what I need. First of all I don't have nor need a queue... – DarioP Mar 06 '13 at 16:51
  • 1
    @DarioP - yes, you do, honest :) – Martin James Mar 06 '13 at 17:16
  • @DarioP Put it another way - search SO for 'thread manager' and see how many attempts have failed. There are a lot, and yours will probably join, (sorry), them. Use a pool, as suggested by others. Try very hard to avoid join(), AKA 'deadlock generator'. – Martin James Mar 06 '13 at 17:19
  • @DarioP: Thread pools is not a solution that is suggested. It is the name of the thing you're trying to implement. It's like someone asking for a mode of transport with four wheels and an engine and others suggest he look at cars and he tells them that cars are overkill. – slebetman Mar 07 '13 at 03:35
  • @slebetman, not it's not. What he is trying to do has nothing to do with thread pool –  Mar 07 '13 at 09:15

4 Answers4

21

Here is a possible approach using only classes from the C++11 Standard Library. Basically, each thread you create has an associated command queue (encapsulated in std::packaged_task<> objects) which it continuously check. If the queue is empty, the thread will just wait on a condition variable (std::condition_variable).

While data races are avoided through the use of std::mutex and std::unique_lock<> RAII wrappers, the main thread can wait for a particular job to be terminated by storing the std::future<> object associated to each submitted std::packaged_tast<> and call wait() on it.

Below is a simple program that follows this design. Comments should be sufficient to explain what it does:

#include <thread>
#include <iostream>
#include <sstream>
#include <future>
#include <queue>
#include <condition_variable>
#include <mutex>

// Convenience type definition
using job = std::packaged_task<void()>;

// Some data associated to each thread.
struct thread_data
{
    int id; // Could use thread::id, but this is filled before the thread is started
    std::thread t; // The thread object
    std::queue<job> jobs; // The job queue
    std::condition_variable cv; // The condition variable to wait for threads
    std::mutex m; // Mutex used for avoiding data races
    bool stop = false; // When set, this flag tells the thread that it should exit
};

// The thread function executed by each thread
void thread_func(thread_data* pData)
{
    std::unique_lock<std::mutex> l(pData->m, std::defer_lock);
    while (true)
    {
        l.lock();

        // Wait until the queue won't be empty or stop is signaled
        pData->cv.wait(l, [pData] () {
            return (pData->stop || !pData->jobs.empty()); 
            });

        // Stop was signaled, let's exit the thread
        if (pData->stop) { return; }

        // Pop one task from the queue...
        job j = std::move(pData->jobs.front());
        pData->jobs.pop();

        l.unlock();

        // Execute the task!
        j();
    }
}

// Function that creates a simple task
job create_task(int id, int jobNumber)
{
    job j([id, jobNumber] ()
    {
        std::stringstream s;
        s << "Hello " << id << "." << jobNumber << std::endl;
        std::cout << s.str();
    });

    return j;
}

int main()
{
    const int numThreads = 4;
    const int numJobsPerThread = 10;
    std::vector<std::future<void>> futures;

    // Create all the threads (will be waiting for jobs)
    thread_data threads[numThreads];
    int tdi = 0;
    for (auto& td : threads)
    {
        td.id = tdi++;
        td.t = std::thread(thread_func, &td);
    }

    //=================================================
    // Start assigning jobs to each thread...

    for (auto& td : threads)
    {
        for (int i = 0; i < numJobsPerThread; i++)
        {
            job j = create_task(td.id, i);
            futures.push_back(j.get_future());

            std::unique_lock<std::mutex> l(td.m);
            td.jobs.push(std::move(j));
        }

        // Notify the thread that there is work do to...
        td.cv.notify_one();
    }

    // Wait for all the tasks to be completed...
    for (auto& f : futures) { f.wait(); }
    futures.clear();


    //=================================================
    // Here the main thread does something...

    std::cin.get();

    // ...done!
    //=================================================


    //=================================================
    // Posts some new tasks...

    for (auto& td : threads)
    {
        for (int i = 0; i < numJobsPerThread; i++)
        {
            job j = create_task(td.id, i);
            futures.push_back(j.get_future());

            std::unique_lock<std::mutex> l(td.m);
            td.jobs.push(std::move(j));
        }

        // Notify the thread that there is work do to...
        td.cv.notify_one();
    }

    // Wait for all the tasks to be completed...
    for (auto& f : futures) { f.wait(); }
    futures.clear();

    // Send stop signal to all threads and join them...
    for (auto& td : threads)
    {
        std::unique_lock<std::mutex> l(td.m);
        td.stop = true;
        td.cv.notify_one();
    }

    // Join all the threads
    for (auto& td : threads) { td.t.join(); }
}
Andy Prowl
  • 124,023
  • 23
  • 387
  • 451
  • Yes! Thank you very much.. this solution is not as simple as barriers, but still not to hard to understand, implement and keep under control. I had something like this in my mind when posting, now I need to find out if this is really what I want. – DarioP Mar 07 '13 at 08:14
12

The concept you want is the threadpool. This SO question deals with existing implementations.

The idea is to have a container for a number of thread instances. Each instance is associated with a function which polls a task queue, and when a task is available, pulls it and run it. Once the task is over (if it terminates, but that's another problem), the thread simply loop over to the task queue.

So you need a synchronized queue, a thread class which implements the loop on the queue, an interface for the task objects, and maybe a class to drive the whole thing (the pool class).

Alternatively, you could make a very specialized thread class for the task it has to perform (with only the memory area as a parameter for instance). This requires a notification mechanism for the threads to indicate that they are done with the current iteration.

The thread main function would be a loop on that specific task, and at the end of one iteration, the thread signals its end, and wait on condition variables to start the next loop. In essence, you would be inlining the task code within the thread, dropping the need of a queue altogether.

 using namespace std;

 // semaphore class based on C++11 features
 class semaphore {
     private:
         mutex mMutex;
         condition_variable v;
         int mV;
     public:
         semaphore(int v): mV(v){}
         void signal(int count=1){
             unique_lock lock(mMutex);
             mV+=count;
             if (mV > 0) mCond.notify_all();
         }
         void wait(int count = 1){
             unique_lock lock(mMutex);
             mV-= count;
             while (mV < 0)
                 mCond.wait(lock);
         }
 };

template <typename Task>
class TaskThread {
     thread mThread;
     Task *mTask;
     semaphore *mSemStarting, *mSemFinished;
     volatile bool mRunning;
    public:
    TaskThread(Task *task, semaphore *start, semaphore *finish): 
         mTask(task), mRunning(true), 
         mSemStart(start), mSemFinished(finish),
        mThread(&TaskThread<Task>::psrun){}
    ~TaskThread(){ mThread.join(); }

    void run(){
        do {
             (*mTask)();
             mSemFinished->signal();
             mSemStart->wait();
        } while (mRunning);
    }

   void finish() { // end the thread after the current loop
         mRunning = false;
   }
private:
    static void psrun(TaskThread<Task> *self){ self->run();}
 };

 classcMyTask {
     public:
     MyTask(){}
    void operator()(){
        // some code here
     }
 };

int main(){
    MyTask task1;
    MyTask task2;
    semaphore start(2), finished(0);
    TaskThread<MyTask> t1(&task1, &start, &finished);
    TaskThread<MyTask> t2(&task2, &start, &finished);
    for (int i = 0; i < 10; i++){
         finished.wait(2);
         start.signal(2);
    }
    t1.finish();
    t2.finish();
}

The proposed (crude) implementation above relies on the Task type which must provide the operator() (ie. a functor like class). I said you could incorporate the task code directly in the thread function body earlier, but since I don't know it, I kept it as abstract as I could. There's one condition variable for the start of threads, and one for their end, both encapsulated in semaphore instances.

Seeing the other answer proposing the use of boost::barrier, I can only support this idea: make sure to replace my semaphore class with that class if possible, the reason being that it is better to rely on well tested and maintained external code rather than a self implemented solution for the same feature set.

All in all, both approaches are valid, but the former gives up a tiny bit of performance in favor of flexibility. If the task to be performed takes a sufficiently long time, the management and queue synchronization cost becomes negligible.

Update: code fixed and tested. Replaced a simple condition variable by a semaphore.

Community
  • 1
  • 1
didierc
  • 14,572
  • 3
  • 32
  • 52
  • That's what I suggested in the comments, but apparently OP doesn't think they fit his needs. Could you address his concern that thread pools may be overkill? – us2012 Mar 06 '13 at 16:38
  • I think I more or less did. I don't provide a sample implementation because I'm using my cellphone atm, and I don't have a good enough knowledge of c++ std::threads and condition vars. to write it confidently. Edit are welcomes. – didierc Mar 06 '13 at 16:47
  • This is the second suggestion to look for a pool. I will think about this, but still there must be a solution based on mutex and condition variables. I would like to approach the problem (also) from this point of view. – DarioP Mar 06 '13 at 16:48
  • @DarioP Threadpools are based on mutexes and condition variables, they just hide the cruft behind some nice clean interface. – Stephan Dollberg Mar 06 '13 at 16:55
  • 3
    Yes - please go with the pool approach. Micro-managing threads nearly always fails, usually badly. – Martin James Mar 06 '13 at 17:24
  • provided a simple (probably broken) implementation for the cond wait setup. I would not rely too much on it, but it might give you a idea of the problem. – didierc Mar 06 '13 at 17:27
  • code updated and working. you might prefer @AndyProwl solution, if you decide not to use threadpools. – didierc Mar 07 '13 at 03:26
5

It can easily be achieved using a barrier (just a convenience wrapper over a conditional variable and a counter). It basically blocks until all N threads have reached the "barrier". It then "recycles" again. Boost provides an implementation.

void myfunc(void * p, boost::barrier& start_barrier, boost::barrier& end_barrier) {
  while (!stop_condition) // You'll need to tell them to stop somehow
  {
      start_barrier.wait ();
      do_something(p);
      end_barrier.wait ();
  }
}

int main(){
  void * myp[n_threads] {a_location, another_location,...};

  boost::barrier start_barrier (n_threads + 1); // child threads + main thread
  boost::barrier end_barrier (n_threads + 1); // child threads + main thread

  std::thread mythread[n_threads];

    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i] = std::thread(myfunc, myp[i], start_barrier, end_barrier);
    }

  start_barrier.wait (); // first unblock the threads

  for (unsigned long int j=0; j < ULONG_MAX; j++) {
    end_barrier.wait (); // mix_data must not execute before the threads are done
    mix_data(myp); 
    start_barrier.wait (); // threads must not start new iteration before mix_data is done
  }
  return 0;
}
  • This is for a scientific software which in principle should compile and run on many different machines and easily be adapted to run on clusters. For portability purposes I would like to use just the standard library or at least POSIX and so avoid boost. However this solution is really, really nice and simple! I will take this in serious consideration, thank you! – DarioP Mar 07 '13 at 07:46
  • @DarioP, barrier is dead easy to implement literally a dozen lines of code. You can copy paste it from here http://www.boost.org/doc/libs/1_53_0/boost/thread/barrier.hpp Just replace `boost::mutex` and `conditional_variable` with posix equivalent –  Mar 07 '13 at 09:14
  • Yes, it should be easy to implement the barrier class. I also improved a little bit your code. Do you think it's fine? – DarioP Mar 07 '13 at 13:11
  • @DarioP, not it's not OK. With your correction the code would have led to a deadlock –  Mar 07 '13 at 14:00
  • In fact that was what I was obtaining in a few cycles. Now I have to figure why. – DarioP Mar 07 '13 at 14:07
  • @aleguna, definetly I get a deadlock independently on how I structure the cycle, both with your and mine layout. I actually cannot see any difference between them. Maybe I am missing something or I am doing some mistakes on the program which is pretty big.. – DarioP Mar 07 '13 at 14:31
  • @DarioP: Not using boost for portability purposes seems like a bad choice - boost is extremely portable and will most certainly work on any POSIX compatible system. – Björn Pollex Mar 08 '13 at 15:33
  • @BjörnPollex, I may not be able to compile on the specific cluster if the libraries are not present. That often happens in scientific environment and I really want to avoid. – DarioP Mar 08 '13 at 19:14
0

The following is a simple compiling and working code performing some random stuffs. It implements aleguna's concept of barrier. The task length of each thread is different so it is really necessary to have a strong synchronization mechanism. I will try to do a pool on the same tasks and benchmark the result, and then maybe with futures as pointed out by Andy Prowl.

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <complex>
#include <random>

const unsigned int n_threads=4; //varying this will not (almost) change the total amount of work
const unsigned int task_length=30000/n_threads;
const float task_length_variation=task_length/n_threads;
unsigned int rep=1000; //repetitions of tasks

class t_chronometer{
 private: 
  std::chrono::steady_clock::time_point _t;

 public:
  t_chronometer(): _t(std::chrono::steady_clock::now()) {;}
  void reset() {_t = std::chrono::steady_clock::now();}
  double get_now() {return std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - _t).count();}
  double get_now_ms() {return 
      std::chrono::duration_cast<std::chrono::duration<double,std::milli>>(std::chrono::steady_clock::now() - _t).count();}
};

class t_barrier {
 private:
   std::mutex m_mutex;
   std::condition_variable m_cond;
   unsigned int m_threshold;
   unsigned int m_count;
   unsigned int m_generation;

 public:
   t_barrier(unsigned int count):
    m_threshold(count),
    m_count(count),
    m_generation(0) {
   }

   bool wait() {
      std::unique_lock<std::mutex> lock(m_mutex);
      unsigned int gen = m_generation;

      if (--m_count == 0)
      {
          m_generation++;
          m_count = m_threshold;
          m_cond.notify_all();
          return true;
      }

      while (gen == m_generation)
          m_cond.wait(lock);
      return false;
   }
};


using namespace std;

void do_something(complex<double> * c, unsigned int max) {
  complex<double> a(1.,0.);
  complex<double> b(1.,0.);
  for (unsigned int i = 0; i<max; i++) {
    a *= polar(1.,2.*M_PI*i/max);
    b *= polar(1.,4.*M_PI*i/max);
    *(c)+=a+b;
  }
}

bool done=false;
void task(complex<double> * c, unsigned int max, t_barrier* start_barrier, t_barrier* end_barrier) {
  while (!done) {
    start_barrier->wait ();
    do_something(c,max);
    end_barrier->wait ();
  }
  cout << "task finished" << endl;
}

int main() {
  t_chronometer t;

  std::default_random_engine gen;
  std::normal_distribution<double> dis(.0,1000.0);

  complex<double> cpx[n_threads];
  for (unsigned int i=0; i < n_threads; i++) {
    cpx[i] = complex<double>(dis(gen), dis(gen));
  }

  t_barrier start_barrier (n_threads + 1); // child threads + main thread
  t_barrier end_barrier (n_threads + 1); // child threads + main thread

  std::thread mythread[n_threads];
  unsigned long int sum=0;
  for (unsigned int i=0; i < n_threads; i++) {
    unsigned int max = task_length +  i * task_length_variation;
    cout << i+1 << "th task length: " << max << endl;
    mythread[i] = std::thread(task, &cpx[i], max, &start_barrier, &end_barrier);
    sum+=max;
  }
  cout << "total task length " << sum << endl;

  complex<double> c(0,0);
  for (unsigned long int j=1; j < rep+1; j++) {
    start_barrier.wait (); //give to the threads the missing call to start
    if (j==rep) done=true;
    end_barrier.wait (); //wait for the call from each tread
    if (j%100==0) cout << "cycle: " << j << endl;
    for (unsigned int i=0; i<n_threads; i++) {
      c+=cpx[i];
    }
  }
  for (unsigned int i=0; i < n_threads; i++) {
    mythread[i].join();
  }
  cout << "result: " << c << " it took: " << t.get_now() << " s." << endl;
  return 0;
}
DarioP
  • 5,377
  • 1
  • 33
  • 52
  • I would like to say that the barrier solution doesn't provide a big performance boost.. for the present implementation joining the threads and creating new ones uses almost the same time than notifying them with a cv. – DarioP Mar 18 '13 at 15:52