1

I'm having the hardest time trying to wrap my head around how to allow threads to signal each other.

My design:

The main function creates a single master thread that coordinates a bunch of other worker threads. The main function also creates the workers because the worker threads spawn and exit at intervals programmed in the main. The master thread needs to be able to signal these worker threads and signal_broadcast them all as well as the worker threads have to signal the master back (pthread_cond_signal). Since each thread needs a pthread_mutex and pthread_cond I made a Worker class and a Master class with these variables. Now this is where I am stuck. C++ does not allow you to pass member functions as the pthread_create(...) handler so I had to make a static handler inside and pass a pointer to itself to reinterpret_cast it to use its class data...

void Worker::start() {
pthread_create(&thread, NULL, &Worker::run, this);
}

void* Worker::run(void *ptr) {
Worker* data = reinterpret_cast<Worker*>(ptr);
}

The problem I have with this, probably wrong, setup is that when I passed in an array of worker pointers to the Master thread it signals a different reference of worker because I think the cast did some sort of copy. So I tried static_cast and same behavior.

I just need some sort of design where the Master and workers can pthread_cond_wait(...) and pthread_cond_signal(...) each other.

Edit 1

Added:

private:
    Worker(const Worker&);

Still not working.

Casey
  • 41,449
  • 7
  • 95
  • 125
user622469
  • 453
  • 1
  • 3
  • 14
  • Your startup looks fine - `reinterpret_cast` and `static_cast` should both do the same thing in this case since you're casting from `void*`. I think the likely problem is that you have `pthread_mutex_t` member variables that you are copy/moving around after initialization, which is strictly forbidden. Have you disabled copy construction in your `Master/Worker` classes? – Casey Jul 19 '13 at 21:15
  • I tried to but: private Worker(const Worker &); Worker operator=(const Worker &); gives me error: ‘Worker Worker::operator=(const Worker&)’ is private – user622469 Jul 19 '13 at 21:28
  • The copy constructor should not be invoked if you are passing an array of pointers to the master thread. – jxh Jul 19 '13 at 21:51
  • @user622469 Isn't that what you expect? The point of making it private is precisely for this error to occur. The error shows (at compile time) that the assignment operator for Worker is being called at that location. You need to get around that – maditya Jul 19 '13 at 21:52
  • Typically passing/returning by value or storing directly in a container is the gotcha. – Casey Jul 19 '13 at 22:31

2 Answers2

2

Edit Fixed the potential race in all versions:

1./1b Employs a sempaaphore built from a (mutex+condition+counter) as outlined in C++0x has no semaphores? How to synchronize threads?
2. uses a 'reverse' wait to ensure that a signal got ack-ed by the intended worker

I'd really suggest to use c++11 style <thread> and <condition_variable> to achieve this.

I have two (and a half) demonstations. They each assume you have 1 master that drives 10 workers. Each worker awaits a signal before it does it's work.

We'll use std::condition_variable (which works in conjunction with a std::mutex) to do the signaling. The difference between the first and second version will be the way in which the signaling is done:

  • 1. Notifying any worker, one at a time:
  • 1b. With a worker struct
  • 2. Notifying all threads, coordinating which recipient worker is to respond

1. Notifying any worker, one at a time:

This is the simplest to do, because there's little coordination going on:

#include <vector>
#include <thread>
#include <mutex>
#include <algorithm>
#include <iostream>
#include <condition_variable>

using namespace std;

class semaphore 
{ // see https://stackoverflow.com/questions/4792449/c0x-has-no-semaphores-how-to-synchronize-threads
    std::mutex mx;
    std::condition_variable cv;
    unsigned long count;
public:
    semaphore() : count() {} 
    void notify();
    void wait();
};

static void run(int id, struct master& m);

struct master
{
    mutable semaphore sem;

    master()
    {
        for (int i = 0; i<10; ++i)
            threads.emplace_back(run, i, ref(*this));
    }

    ~master() {
        for(auto& th : threads) if (th.joinable()) th.join(); 
        std::cout << "done\n";
    }

    void drive()
    {
        // do wakeups
        for (unsigned i = 0; i<threads.size(); ++i)
        {
            this_thread::sleep_for(chrono::milliseconds(rand()%100));
            sem.notify();
        }
    }

  private:
    vector<thread> threads;
};

static void run(int id, master& m)
{
    m.sem.wait();
    {
        static mutex io_mx;
        lock_guard<mutex> lk(io_mx);
        cout << "signaled: " << id << "\n";
    }
}

int main()
{
    master instance;
    instance.drive();
}

/// semaphore members
void semaphore::notify()
{
    lock_guard<mutex> lk(mx);
    ++count;
    cv.notify_one();
}

void semaphore::wait()
{
    unique_lock<mutex> lk(mx);
    while(!count)
        cv.wait(lk);
    --count;
}

1b. With a worker struct

Note, if you had worker classes with worker::run a non-static member function, you can do the same with minor modifications:

struct worker
{
    worker(int id) : id(id) {}

    void run(master& m) const;

    int id;
};

// ...
struct master
{
    // ...

    master()
    {
        for (int i = 0; i<10; ++i)
            workers.emplace_back(i);

        for (auto& w: workers)
            threads.emplace_back(&worker::run, ref(w), ref(*this));
    }

// ...

void worker::run(master& m) const
{
    m.sem.wait();
    {
        static mutex io_mx;
        lock_guard<mutex> lk(io_mx);
        cout << "signaled: " << id << "\n";
    }
}

A caveat

  • cv.wait() could suffer spurious wake-ups, in which the condition variable wasn't atually raised (e.g. in the event of OS signal handlers). This is a common thing to happen with condition variables on any platfrom.

The following approach fixes this:

2. Notifying all threads, coordinating which recipient worker

Use a flag to signal which thread was intended to receive the signal:

struct master
{
    mutable mutex mx;
    mutable condition_variable cv;
    int signaled_id;               // ADDED

    master() : signaled_id(-1)
    {

Let's pretend that driver got a lot more interesting and wants to signal all workers in a specific (random...) order:

    void drive()
    {
        // generate random wakeup order
        vector<int> wakeups(10);
        iota(begin(wakeups), end(wakeups), 0);
        random_shuffle(begin(wakeups), end(wakeups));

        // do wakeups
        for (int id : wakeups)
        {
            this_thread::sleep_for(chrono::milliseconds(rand()%1000));
            signal(id);
        }
    }

  private:
    void signal(int id)                // ADDED id
    {
        unique_lock<mutex> lk(mx);

        std::cout << "signaling " << id << "\n";

        signaled_id = id;              // ADDED put it in the shared field
        cv.notify_all();

        cv.wait(lk, [&] { return signaled_id == -1; });
    }

Now all we have to do is make sure that the receiving thread checks that it's id matches:

m.cv.wait(lk, [&] { return m.signaled_id == id; });
m.signaled_id = -1;
m.cv.notify_all();

This puts an end to spurious wake-ups.

Full code listings/live demos:

Community
  • 1
  • 1
sehe
  • 374,641
  • 47
  • 450
  • 633
  • I think your `signal()` scheme with a `signaled_id` has a race condition in a situation where the master thread can signal a second thread before the last one signaled gets a chance to read `signal_id`. It's a situation that might not be seen in a test scenario, but will almost certainly be seen in real world runs if you don't take specific measures to prevent it. Similarly if a thread doesn't check `signaled_id` before calling `condition_variable::wait()` (while holding the lock), it'll miss its signal. – Michael Burr Jul 20 '13 at 00:31
  • @MichaelBurr this cannot occur due to the way `wait` locks/unlocks the mutex across the wait. See [`std::condition_variable::wait`](http://en.cppreference.com/w/cpp/thread/condition_variable/wait) under 1). _(The second may be an issue depending on the application. You could easily fix it by locking the mutex before/during thread creation.)_ – sehe Jul 20 '13 at 00:41
  • since you call `notify_all()` I assume that more than one waiting thread might be released. While the targeted thread is waiting to acquire the mutex, it would be possible for the main thread to come along, acquire the mutex and change `signaled_id`. There's no guarantee of FIFO behavior for threads waiting on a mutex. Strictly speaking, even if only one thread is waiting, the main thread could reacquire the mutex before the unblocked waiting thread has an opportunity acquire the mutex and return from the `wait()` call. – Michael Burr Jul 20 '13 at 03:51
  • Mmm. I'm not so sure anymore. Indeed there's no explicit guarantees. I may have rediscovered why everyone sane uses queues and/or semaphores for tasks like this. _In fairness, I have clearly overcomplicated the requirements relative to the OP, so I might just simplify but let me fix it now_ – sehe Jul 20 '13 at 10:33
  • @MichaelBurr I've fixed all three variants now. The first two with a proper semaphore. The second one provisionally by reverse-waiting until the worker `ACK`-s the signal. _Of course, a proper queue solution to this would be more flexible, but I'm already **waaaay** beyond the scope of the question_.. Did I miss anything else? – sehe Jul 20 '13 at 10:56
0

It is not clear what your exact circumstances are, but it seems like you are using a container to hold your "Worker" instances that are created in main, and passing them to your "Master". If this is the case, there are a few remedies available to you. You need to pick one that is appropriate to your implementation.

  • Pass a reference to the container in main to the Master.
  • Change the container to hold (smart) pointers to Workers.
  • Make the container part of "Master" itself, so that it doesn't need to be passed to it.
  • Implement a proper destructor, copy constructor, and assignment operator for your Worker class (in other words, obey the Rule of Three).

Technically speaking, since pthread_create() is a C API, the function pointer that is passed to it needs to have C linkage (extern "C"). You can't make a method of a C++ class have C linkage, so you should define an external function:

extern "C" { static void * worker_run (void *arg); }

class Worker { //...
};

static void * worker_run (void *arg) {
    return Worker::run(arg);
}
Community
  • 1
  • 1
jxh
  • 69,070
  • 8
  • 110
  • 193