0

I have multiple std::threads but only one of them should perform some task (say, printf) (similar to pragma omp single).

I've tried to modify semaphore code, but it doesn't work as I expected.

#ifndef SEMAPHORE_H
#define SEMAPHORE_H

#include <mutex>
#include <condition_variable>
using namespace std;

class semaphore {
private:
    mutex mtx;
    condition_variable cv;
    int count, countMax;

public:
    semaphore(int count_ = 0):count(count_), countMax(count_){;}
    void notify()
    {
        unique_lock<mutex> lck(mtx);
        ++count;
        cv.notify_one();
    }
    void notifyAll()
    {
        unique_lock<mutex> lck(mtx);
        count = countMax;
        cv.notify_all();
    }

    bool wait()
    {
        unique_lock<mutex> lck(mtx);
        if (--count == 0) {
            return true;
        } else {
            cv.wait(lck, [this]() { return count > 0; });
            return false;
        }
    }
};

#endif // SEMAPHORE_H

And the main program:

#include <iostream>
#include <vector>
#include <thread>
#include "semaphore.h"

semaphore sem(2);
int sum = 0;
std::mutex sumMutex;
int sumPrintAndReturn(int i)
{
    {
        std::lock_guard<std::mutex> lock(sumMutex);
        sum += i;
    }
    if (sem.wait()) {
        std::cout << "Sum (ONCE): " << sum << std::endl;
        sem.notifyAll();
    }
    std::cout << "Sum (EVERY): " << sum << std::endl;
    return sum;
}

int main()
{
    std::vector<std::thread> threads;
    for (int i = 0; i < 2; i++) {
        threads.push_back(std::thread(sumPrintAndReturn, i));
    }
    for (auto& thread: threads)
        thread.join();
    return 0;
}

The problem is that final sum is different.

Sum (EVERY): 0
Sum (ONCE): 1
Sum (EVERY): 1

So why I'm talking about omp single? Here is an example and output I expect.

#include <iostream>
#include <omp.h>

int main()
{
    int sum = 0;
    int global_i = 0;
    #pragma omp parallel num_threads(2)
    {
        int i;
        #pragma omp critical
        i = global_i++;
        #pragma omp atomic
        sum += i;
        #pragma omp single
        printf("Sum (ONCE): %d\n", sum);
        printf("Sum (EVERY): %d\n", sum);
    }
}

And output:

Sum (ONCE): 1
Sum (EVERY): 1
Sum (EVERY): 1

I can't answer my thread so I'll post final and working variant here

#ifndef SEMAPHORE_H
#define SEMAPHORE_H

#include <mutex>
#include <condition_variable>
#include <atomic>
#include <functional>

class semaphore {
private:
    std::mutex mtx;
    std::condition_variable cv;
    std::atomic<int> count;
    const int countMax;
    bool flag;

    void releaseAll()
    {
        std::unique_lock<std::mutex> lck(mtx);
        flag = true;
        cv.notify_all();
        cv.wait(lck, [this]() { return !flag; });
    }

    bool wait()
    {
        std::unique_lock<std::mutex> lck(mtx);
        if (--count == 0) {
            count++;
            return false;
        }
        else {
            cv.wait(lck, [this]() { return flag; });
            count++;
            if (count == countMax) {
                flag = false;
                cv.notify_all();
            }
            cv.wait(lck, [this]() { return !flag; });
            return true;
        }
    }

public:
    semaphore(int count_ = 0) :count(count_), countMax(count_), flag(false){ }
    void runOnce(std::function<void()> func) {
        if (!wait()) {
            func();
            releaseAll();
        }
    }


};

#endif // SEMAPHORE_H
Community
  • 1
  • 1
user28464
  • 173
  • 2
  • 12
  • Er, isn't that the expected behaviour? The mutex ensures that only one thread will attempt to write to `cout` at a time, but it does not prevent the waiting threads accessing the resource once the mutex is released, no? – Rook May 11 '14 at 15:42
  • Which mutex? `sumMutex` is not related to `cout` and I don't care whether `semaphore` uses mutex, I need reverse semaphore – user28464 May 11 '14 at 15:46
  • I remain confused. You have two threads, one runs `sumPrintAndReturn(0)`, which adds 0 to `i`. The second thread runs `sumPrintAndReturn(1)`, which adds 1 to `i`. The expected total value of `i` on exit is `1`, which is what you have, so where's the problem? Don't assume that `cout` will sensibly order messages sent from multiple threads. – Rook May 11 '14 at 16:12
  • I expect "Sum (EVERY): 1" printed twice. All threads should stop at `if (sem.wait()) {`, only one of them should enter the block and execute it, all others should wait during this time. When that single thread is done (say `std::cout << "Sum (ONCE): " << sum << std::endl;`) it should unlock other threads (and itselves, but it wasn't locked so it is not needed) to continue execution (say, `std::cout << "Sum (EVERY): " << sum << std::endl;) – user28464 May 11 '14 at 16:19
  • Aha, I see a simple error: `cv.wait` in `sem.wait` will always return true for 2 threads! On the first call to `sem.wait`, `count` is decremented from 2 to 1. Because it is not zero, `sem.wait` does not return true. Because it is greater than zero, `cv.wait` returns immediately without blocking. – Rook May 11 '14 at 16:26

1 Answers1

1

The problem is in the implementation of your wait function. The issue is the way you've handled the condition variable... [this]() { return count > 0; } will always return true when your semaphore was given an initial count of 2 and you run two threads!

I've made a small change below, where I've added a new boolean variable which holds the "success" status, which is set when the final "winning" thread calls sem.wait(). I make no guarantees as to the safety or functionality of this code; it merely works for me ;) (VS2013 express).

class semaphore {
private:
  mutex mtx;
  condition_variable cv;
  int count, countMax;
  bool flag;

public:
  semaphore(int count_ = 0) :count(count_), countMax(count_), flag(false){ ; }
  void notify()
  {
    unique_lock<mutex> lck(mtx);
    ++count;
    cv.notify_one();
  }
  void notifyAll()
  {
    unique_lock<mutex> lck(mtx);
    count = countMax;
    cv.notify_all();
  }

  bool wait()
  {
    unique_lock<mutex> lck(mtx);
    if (--count == 0) {
      flag = true;
      return true;
    }
    else {
      cv.wait(lck, [this]() { return flag; });
      return false;
    }
  }
};

Sample output: Sum (ONCE): 1 Sum (EVERY): 1 Sum (EVERY): 1

Note: the original code in the question was changed since I answered this question. My original answer is kept below for completeness.


This function looks very suspicious:

bool wait()
{
    unique_lock<mutex> lck(mtx);
    if (--count == 0) {
        return true;
    } else {
        cv.wait(lck, [this]() { return count > 0; });
    }
}

There's only one return value! Keeping an eye on your compiler warnings would spot that.

$ g++ --std=c++11 -Wall semaphore.cpp
semaphore.cpp: In function ‘int sumPrintAndReturn(int)’:
semaphore.cpp:19:1: warning: no return statement in function returning non-void [-Wreturn-type]
In file included from semaphore.cpp:4:0:
semaphore.h: In member function ‘bool semaphore::wait()’:
semaphore.h:37:5: warning: control reaches end of non-void function [-Wreturn-type]
Rook
  • 5,734
  • 3
  • 34
  • 43
  • Sorry, it was example simplification problem. Fixed, added output displaying the problem. – user28464 May 11 '14 at 16:04
  • Thanks! Seems it works. Also I want to point out to others that in this edition flag is not "reseted" to `false` so semaphore should be used only once. – user28464 May 11 '14 at 17:04
  • I also want to point out that setting `flag = true;` would be better inside `notifyAll` method because (if I'm not mistaken) `notify` can be emited randomly and could other threads could continue perform their actions (because `flag==true`) even before e.g. `std::cout << "Sum (ONCE): " << sum << std::endl;` – user28464 May 11 '14 at 17:22
  • @user28464 I see you've un-accepted this as a valid answer... could you share what the problem is? – Rook May 15 '14 at 15:07
  • Because final version is mine in first post. It has better sync – user28464 Jun 01 '14 at 09:00