151

Is it true that C++0x will come without semaphores? There are already some questions on Stack Overflow regarding the use of semaphores. I use them (posix semaphores) all the time to let a thread wait for some event in another thread:

void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

If I would do that with a mutex:

void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

Problem: It's ugly and it's not guaranteed that thread1 locks the mutex first (Given that the same thread should lock and unlock a mutex, you also can't lock event1 before thread0 and thread1 started).

So since boost doesn't have semaphores either, what is the simplest way to achieve the above?

Whymarrh
  • 13,139
  • 14
  • 57
  • 108
tauran
  • 7,986
  • 6
  • 41
  • 48

12 Answers12

217

You can easily build one from a mutex and a condition variable:

#include <mutex>
#include <condition_variable>

class semaphore {
    std::mutex mutex_;
    std::condition_variable condition_;
    unsigned long count_ = 0; // Initialized as locked.

public:
    void release() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void acquire() {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        while(!count_) // Handle spurious wake-ups.
            condition_.wait(lock);
        --count_;
    }

    bool try_acquire() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        if(count_) {
            --count_;
            return true;
        }
        return false;
    }
};
Maxim Egorushkin
  • 131,725
  • 17
  • 180
  • 271
  • 106
    someone should submit a proposal to the standards commitee –  Jun 06 '12 at 23:36
  • Not sure it would warrant inclusions in the standard. Firstly, it can be implemented using existing synchronization primitives. Secondly, it can be implemented in a number of ways. – Maxim Egorushkin Jun 11 '12 at 20:55
  • 9
    a comment here that puzzled me initially is the lock in wait, one might ask how can a thread can get past notify if the lock is held by wait? the somewhat poorly obscurely documented answer is that condition_variable.wait pulses the lock, allowing another thread to get past notify in an atomic fashion, at least that's how I understand it –  Jun 14 '12 at 06:53
  • 2
    http://pubs.opengroup.org/onlinepubs/009696799/functions/pthread_cond_signal.html: _The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal()._ – Maxim Egorushkin Jun 14 '12 at 18:16
  • 36
    It was *deliberately* excluded from Boost on the basis that a semaphore is too much rope for programmers to hang themselves with. Condition variables supposedly are more manageable. I see their point but feel a bit patronized. I assume that the same logic applies to C++11 -- programmers are expected to write their programs in a way that "naturally" uses condvars or other approved synchronization techniques. Supply a semaphore would run against that regardless of whether it's implemented on top of condvar or natively. – Steve Jessop Aug 31 '12 at 15:31
  • 1
    I assume the `count_` member should be initialized to something other than 0 (i.e., greater than 0), or am I missing something? – Dan Nissenbaum Nov 16 '12 at 07:05
  • 7
    Note - See http://en.wikipedia.org/wiki/Spurious_wakeup for the rationale behind the `while(!count_)` loop. – Dan Nissenbaum Nov 16 '12 at 07:49
  • 1
    @DanNissenbaum: `count_` is initialized to 0 because the semaphore is created as closed. – Maxim Egorushkin Nov 16 '12 at 09:18
  • 2
    Not sure if this is the best solution. Any access to the variable require locking, even when there is no contention. OS provide underlying primitive more efficient than this, with atomic CAS, that would not lock each time there is contention. – xryl669 Jan 15 '14 at 17:59
  • 1
    @xryl669 You should be aware that mutexes are optimized for non-contended case, [at least on Linux](http://man7.org/linux/man-pages/man7/futex.7.html). Basically, you have two choices: use the standard synchronization primitives that employ many known optimizations or invent your own wheel. – Maxim Egorushkin Apr 17 '14 at 08:23
  • 1
    @Maxim Exactly. I think the solution should involve using OS's semaphore instead of reinventing the wheel that are already the best implementation available. – xryl669 Apr 17 '14 at 17:18
  • @xryl669 On Linux semaphores involve syscalls for all operations. Whereas operations on mutexes do not involve syscalls for non-contended scenario. Basically, there is little reason to use a semaphore over a mutex + condition. – Maxim Egorushkin Jul 24 '14 at 10:12
  • 5
    @Maxim I'm sorry, I don't think you're right. sem_wait and sem_post only syscall on contention too (check https://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/sem_wait.c ) so the code here ends up duplicating the libc implementation, with potentially bugs. If you intend portability on any system, it might be a solution, but if you only need Posix compatibility, use Posix semaphore. – xryl669 Aug 05 '14 at 15:58
  • 1
    Please note this is only a valid "thread semaphore" to be used across threads of the same process, not like a posix semaphore that is able to be used across different processes. – Jorge González Lorenzo Nov 15 '15 at 17:33
  • @JorgeGonzálezLorenzo Replace `boost::` with `boost::interprocess::` to make it inter-process. – Maxim Egorushkin Nov 16 '15 at 09:49
  • The two instances named `lock` make it a bit confusing when you are invoking `condition_.wait(lock)`. I will have to give a good read to the condition variable spec... – gatopeich Feb 12 '16 at 15:36
  • As a side note to this I tried adding a timeout on this implementation using chrono, but realized that the semaphore count then needs extra care in case the timeout occurs OR if the wait is satisfied. My version seems to work with a reset() method, but it would be interesting to see if someone else has nice variant of this semeaphore with a timeout feature. – Larswad Oct 28 '17 at 19:58
  • 1
    in `notify()`, shouldn't `condition_.notify_one();` be executed outside of the locked mutex scope, as the waked-up thread could otherwise not immediately grab lock on it? – moala Jan 05 '18 at 11:01
  • 1
    @moala Notifying a condition variable with mutex lock ensures predictable scheduling (fairness). See documentation for `pthread_cond_signal` and `std::condition_variable::notify_one`. – Maxim Egorushkin Jan 05 '18 at 18:36
  • So what about replacing the while loop with a lambda? Forgive me the change, I should've put `this` in the capture list for accessing `count_`. – jaques-sam Dec 19 '19 at 13:24
  • @DrumM _So what about replacing the while loop with a lambda?_ You tell me. I prefer the loop because it is clear what is going on, unlike with lambda. – Maxim Egorushkin Dec 19 '19 at 13:27
  • 2
    @user90843 After 8 years, now it is - `std::counting_semaphore` of C++20. – Paul Dec 07 '20 at 09:27
  • 1
    @moala It's more efficient to notify while still holding the mutex. Since that *can't* wake any threads, it's extremely efficient. If you notify after releasing the mutex, you have two operations that might wake threads. – David Schwartz Apr 25 '21 at 05:11
  • Can you give an example on how to use this class ? – Rajeshwar May 27 '21 at 03:58
  • @Rajeshwar You may like to post another question with your problem. – Maxim Egorushkin Jun 10 '21 at 17:21
  • 1
    @DavidSchwartz _notify while still holding the mutex... can't wake any threads_ - but then releasing the mutex still has to wake a waiter up. Let's see: * Notifying one waiter while holding the mutex - a waiter wakes up on notify and blocks on re-acquiring the mutex. To ensure fairness, this notified waiter must be put in the head of the waiting list for that mutex. * Notifying one waiter after releasing the mutex - a waiter wakes up on notify and re-acquires the mutex. Other threads waiting on the same mutex may have got ahead, defeating fairness. – Maxim Egorushkin Jun 10 '21 at 18:09
  • 1
    @MaximEgorushkin "*Notifying one waiter while holding the mutex - a waiter wakes up on notify and blocks on re-acquiring the mutex.*" Some implementations might do that, but it's inefficient. You shouldn't write your code based on what bad implementations might do, unless you know for a fact you are using a bad implementation. (Real implementations don't wake the thread since they know it can't make forward progress.) – David Schwartz Jun 10 '21 at 18:12
  • 1
    @MaximEgorushkin "*To ensure fairness, this notified waiter must be put in the head of the waiting list for that mutex.*" Why? Some other thread might have waited longer. There could be another thread that has been waiting for the mutex longer. I see no reason threads that blocked on the condition variable should always get priority over threads that didn't. That seems unfair to me. In any event, an implementation that wants to do this can move the thread to the head of the wait queue for the mutex when it receives a signal on the c.v. if that's what it wants to do. – David Schwartz Jun 10 '21 at 18:14
  • You should not use `unsigned long count_` – daohu527 Sep 16 '21 at 01:48
  • @dahohu527 Without explanation your comment doesn't make any sense. – Maxim Egorushkin Sep 16 '21 at 16:19
  • @MaximEgorushkin I thought the count would be less than 0, which would cause an overflow error, but in fact it won’t – daohu527 Sep 17 '21 at 06:55
  • @dahohu527 Even if it were, `unsigned` arithmetic wraps and that is well defined. Also notice that `count_` is only ever tested for _equality_ with 0 and not relative ordering. – Maxim Egorushkin Sep 17 '21 at 16:31
  • What if someone repeatedly calls the release method ? shouldn't we limit the Increment? same for the decrement of count_? – Mahmoud Hosseinipour Oct 18 '21 at 11:13
  • 1
    @MahmoudHosseinipour Feel free to add debugging code for your favourite programming mistakes. – Maxim Egorushkin Oct 18 '21 at 11:30
115

Based on Maxim Yegorushkin's answer, I tried to make the example in C++11 style.

#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};
Melebius
  • 6,183
  • 4
  • 39
  • 52
Tsuneo Yoshioka
  • 7,504
  • 4
  • 36
  • 32
  • 39
    You can make wait() also a three-liner: `cv.wait(lck, [this]() { return count > 0; });` – Domi Dec 06 '13 at 13:14
  • 2
    Adding another class in the spirit of lock_guard is helpful, too. In RAII fashion, the constructor, which takes the semaphore as a reference, calls the semaphore's wait() call, and the destructor calls its notify() call. This prevents exceptions from failing to release the semaphore. – Jim Hunziker Oct 24 '14 at 16:58
  • isn't there a dead-lock, if say N threads called wait() and count==0, then cv.notify_one(); is never called, since the mtx hasn't released? – M.G. May 18 '15 at 21:26
  • 1
    @Marcello The waiting threads don't hold the lock. The whole point of condition variables is to provide an atomic "unlock and wait" operation. – David Schwartz Oct 19 '15 at 21:39
  • 5
    You should release lock before calling notify_one() to avoid immediately blocking the wakeup... see here: https://en.cppreference.com/w/cpp/thread/condition_variable/notify_all – kylefinn Dec 20 '19 at 18:31
  • This code doesn't seem like it has ever been tested... The default constructor argument of `0` makes no sense (should be `1` imo) and the lock is not unlocked in `notify` – lucidbrot May 13 '22 at 16:59
  • 1
    @kylefinn @lucidbrot, I guess "unlock first vs notify first" seems to be quite debatable, see @MaximEgorushkin and @DavidSchwartz's comments in the accepted answer or the notes on [`notify_one`] in cppreference.com. Also, I asked a [question](https://stackoverflow.com/q/76697620/10027592) regarding this. – starriet Jul 16 '23 at 10:38
  • @lucidbrot And the value for the `count_` you mentioned is up to the user. Indeed C++20's [counting_semaphore and binary_semaphore](https://en.cppreference.com/w/cpp/thread/counting_semaphore) allow users to select the initial value. (I haven't tested this answer's code though.) – starriet Jul 16 '23 at 10:42
42

I decided to write the most robust/generic C++11 semaphore I could, in the style of the standard as much as I could (note using semaphore = ..., you normally would just use the name semaphore similar to normally using string not basic_string):

template <typename Mutex, typename CondVar>
class basic_semaphore {
public:
    using native_handle_type = typename CondVar::native_handle_type;

    explicit basic_semaphore(size_t count = 0);
    basic_semaphore(const basic_semaphore&) = delete;
    basic_semaphore(basic_semaphore&&) = delete;
    basic_semaphore& operator=(const basic_semaphore&) = delete;
    basic_semaphore& operator=(basic_semaphore&&) = delete;

    void notify();
    void wait();
    bool try_wait();
    template<class Rep, class Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& d);
    template<class Clock, class Duration>
    bool wait_until(const std::chrono::time_point<Clock, Duration>& t);

    native_handle_type native_handle();

private:
    Mutex   mMutex;
    CondVar mCv;
    size_t  mCount;
};

using semaphore = basic_semaphore<std::mutex, std::condition_variable>;

template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
    : mCount{count}
{}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify() {
    std::lock_guard<Mutex> lock{mMutex};
    ++mCount;
    mCv.notify_one();
}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait() {
    std::unique_lock<Mutex> lock{mMutex};
    mCv.wait(lock, [&]{ return mCount > 0; });
    --mCount;
}

template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait() {
    std::lock_guard<Mutex> lock{mMutex};

    if (mCount > 0) {
        --mCount;
        return true;
    }

    return false;
}

template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() {
    return mCv.native_handle();
}
David
  • 27,652
  • 18
  • 89
  • 138
  • This works, with a minor edit. The `wait_for` and `wait_until` method calls with the predicate return a boolean value (not a `std::cv_status). – jdknight Mar 04 '15 at 20:17
  • 1
    sorry to nit-pick so late in the game. `std::size_t` is unsigned so decrementing it below zero is UB, and it will always be `>= 0`. IMHO `count` should be an `int`. – Richard Hodges Nov 29 '15 at 18:44
  • 3
    @RichardHodges there's no way to decrement below zero so there's no problem, and what would a negative count on a semaphore mean? That doesn't even make sense IMO. – David Dec 01 '15 at 15:39
  • Apologies. It seems I misread `> 0` and mistook it for `>= 0`. – Richard Hodges Dec 01 '15 at 16:03
  • 1
    @David What if a thread had to wait for others to initalize things? for instance, 1 reader thread to wait for 4 threads, I would call the semaphore constructor with -3 to make the reader thread wait untill all the other threads made a post. I guess there are other ways to do that, but isn't it reasonable? I think it's in fact the question the OP is asking but with more "thread1"s. – jmmut Jun 20 '16 at 13:48
  • @jmmut I think that would be a very strange way to do things, but maybe negative counts do 'make sense'. On the fence about changing anything to support them though. – David Jun 20 '16 at 15:47
  • 4
    @RichardHodges to be very pedantic, decrementing an unsigned integer type below 0 is not UB. – jcai Jun 02 '17 at 19:29
17

C++20 finally has semaphores - std::counting_semaphore<max_count>.

These have (at least) the following methods:

  • acquire() (blocking)
  • try_acquire() (non-blocking, returns immediately)
  • try_acquire_for() (non-blocking, takes a duration)
  • try_acquire_until() (non-blocking, takes a time at which to stop trying)
  • release()

You can read these CppCon 2019 presentation slides, or watch the video. There's also the official proposal P0514R4, but it may not be up-to-date with actual C++20.

einpoklum
  • 118,144
  • 57
  • 340
  • 684
15

in acordance with posix semaphores, I would add

class semaphore
{
    ...
    bool trywait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        if(count_)
        {
            --count_;
            return true;
        }
        else
        {
            return false;
        }
    }
};

And I much prefer using a synchronisation mechanism at a convenient level of abstraction, rather than always copy pasting a stitched-together version using more basic operators.

Michael Zillich
  • 151
  • 1
  • 2
9

You can also check out cpp11-on-multicore - it has a portable and optimal semaphore implementation.

The repository also contains other threading goodies that complement c++11 threading.

onqtam
  • 4,356
  • 2
  • 28
  • 50
8

You can work with mutex and condition variables. You gain exclusive access with the mutex, check whether you want to continue or need to wait for the other end. If you need to wait, you wait in a condition. When the other thread determines that you can continue, it signals the condition.

There is a short example in the boost::thread library that you can most probably just copy (the C++0x and boost thread libs are very similar).

David Rodríguez - dribeas
  • 204,818
  • 23
  • 294
  • 489
  • Condition signals only to waiting threads, or not? So if thread0 is not there waiting when thread1 signals it will be blocked later? Plus: I don't need the additional lock that comes with the condition - it's overhead. – tauran Jan 25 '11 at 10:49
  • Yes, condition only signals waiting threads. The common pattern is having a variable with the state and a condition in case you need to wait. Think on a producer/consumer, there will be a count on the items in the buffer, the producer locks, adds the element, increments the count and signals. The consumer locks, checks the counter and if non-zero consumes, while if zero waits in the condition. – David Rodríguez - dribeas Jan 25 '11 at 11:13
  • 2
    You can simulate a semaphore this way: Initialize a variable with the value that you would give the semaphore, then `wait()` is translated to "lock, check count if non-zero decrement and continue; if zero wait on condition" while `post` would be "lock, increment counter, signal if it was 0" – David Rodríguez - dribeas Jan 25 '11 at 11:17
  • Yes, sounds good. I wonder if posix semaphores are implemented the same way. – tauran Jan 25 '11 at 11:25
  • @tauran: I don't know for sure (and it might depend which Posix OS), but I think unlikely. Semaphores traditionally are a "lower-level" synchronization primitive than mutexes and condition variables, and in principle can be made more efficient than they would be if implemented on top of a condvar. So, more likely in a given OS is that all user-level synch primitives are built on top of some common tools that interact with the scheduler. – Steve Jessop Aug 31 '12 at 15:28
  • @tauran I doubt it. You cannot synchronise different process with this method, or with any of the proposed answers above. And that is one of the main uses of POSIX semaphores. – Jorge González Lorenzo Nov 15 '15 at 17:29
3

Also can be useful RAII semaphore wrapper in threads:

class ScopedSemaphore
{
public:
    explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
    ScopedSemaphore(const ScopedSemaphore&) = delete;
    ~ScopedSemaphore() { m_Semaphore.Notify(); }

   ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;

private:
    Semaphore& m_Semaphore;
};

Usage example in multithread app:

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;

for (...)
{
    ...
    auto t = new std::thread([..., &semaphore]
    {
        ScopedSemaphore scopedSemaphore(semaphore);
        ...
    }
    );
    threads.push_back(t);
}

for (auto& t : threads)
    t.join();
slasla
  • 31
  • 5
2

I found the shared_ptr and weak_ptr, a long with a list, did the job I needed. My issue was, I had several clients wanting to interact with a host's internal data. Typically, the host updates the data on it's own, however, if a client requests it, the host needs to stop updating until no clients are accessing the host data. At the same time, a client could ask for exclusive access, so that no other clients, nor the host, could modify that host data.

How I did this was, I created a struct:

struct UpdateLock
{
    typedef std::shared_ptr< UpdateLock > ptr;
};

Each client would have a member of such:

UpdateLock::ptr m_myLock;

Then the host would have a weak_ptr member for exclusivity, and a list of weak_ptrs for non-exclusive locks:

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

There is a function to enable locking, and another function to check if the host is locked:

UpdateLock::ptr LockUpdate( bool exclusive );       
bool IsUpdateLocked( bool exclusive ) const;

I test for locks in LockUpdate, IsUpdateLocked, and periodically in the host's Update routine. Testing for a lock is as simple as checking if the weak_ptr's expired, and removing any expired from the m_locks list (I only do this during the host update), I can check if the list is empty; at the same time, I get automatic unlocking when a client resets the shared_ptr they are hanging onto, which also happens when a client gets destroyed automatically.

The over all effect is, since clients rarely need exclusivity (typically reserved for additions and deletions only), most of the time a request to LockUpdate( false ), that is to say non-exclusive, succeeds so long as (! m_exclusiveLock). And a LockUpdate( true ), a request for exclusivity, succeeds only when both (! m_exclusiveLock) and (m_locks.empty()).

A queue could be added to mitigate between exclusive and non-exclusive locks, however, I have had no collisions thus far, so I intend to wait until that happens to add the solution (mostly so I have a real-world test condition).

So far this is working well for my needs; I can imagine the need to expand this, and some issues that might arise over expanded use, however, this was quick to implement, and required very little custom code.

Kit10
  • 1,345
  • 11
  • 12
-1

There old question but I would like to offer another solution. It seems you need a not semathore but a event like Windows Events. Very effective events can be done like following:

#ifdef _MSC_VER
  #include <concrt.h>
#else
  // pthread implementation
  #include <cstddef>
  #include <cstdint>
  #include <shared_mutex>

namespace Concurrency
{
const unsigned int COOPERATIVE_TIMEOUT_INFINITE = (unsigned int)-1;
const size_t COOPERATIVE_WAIT_TIMEOUT = SIZE_MAX;

class event
{
public:
    event();
    ~event();

    size_t wait(unsigned int timeout = COOPERATIVE_TIMEOUT_INFINITE);
    void set();
    void reset();
    static size_t wait_for_multiple(event** _PPEvents, size_t _Count, bool _FWaitAll, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);

    static const unsigned int timeout_infinite = COOPERATIVE_TIMEOUT_INFINITE;
    
private:
    int d;
    std::shared_mutex guard;
};

};

namespace concurrency = Concurrency;

#include <unistd.h>
#include <errno.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

#include <chrono>

#include "../HandleHolder.h"

typedef CommonHolder<int, close> fd_holder;

namespace Concurrency
{
    int watch(int ep_fd, int fd)
    {
        epoll_event ep_event;
        ep_event.events = EPOLLIN;
        ep_event.data.fd = fd;

        return epoll_ctl(ep_fd, EPOLL_CTL_ADD, fd, &ep_event);
    }

    event::event()
        : d(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
    {
    }

    event::~event()
    {
        std::unique_lock<std::shared_mutex> lock(guard);
        close(d);
        d = -1;
    }

    size_t event::wait(unsigned int timeout)
    {
        fd_holder ep_fd(epoll_create1(EPOLL_CLOEXEC));
        {
            std::shared_lock<std::shared_mutex> lock(guard);
            if (d == -1 || watch(ep_fd.GetHandle(), d) < 0)
                return COOPERATIVE_WAIT_TIMEOUT;
        }

        epoll_event ep_event;
        return epoll_wait(ep_fd.GetHandle(), &ep_event, 1, timeout) == 1 && (ep_event.events & EPOLLIN) ? 0 : COOPERATIVE_WAIT_TIMEOUT;
    }

    void event::set()
    {
        uint64_t count = 1;
        write(d, &count, sizeof(count));
    }

    void event::reset()
    {
        uint64_t count;
        read(d, &count, sizeof(count));
    }

    size_t event::wait_for_multiple(event** _PPEvents, size_t _Count, bool _FWaitAll, unsigned int _Timeout)
    {
        if (_FWaitAll) // not implemented
            std::abort();

        const auto deadline = _Timeout != COOPERATIVE_TIMEOUT_INFINITE ? std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() + _Timeout : COOPERATIVE_TIMEOUT_INFINITE;

        fd_holder ep_fd(epoll_create1(EPOLL_CLOEXEC));
        int fds[_Count];
        for (int i = 0; i < _Count; ++i)
        {
            std::shared_lock<std::shared_mutex> lock(_PPEvents[i]->guard);
            fds[i] = _PPEvents[i]->d;
            if (fds[i] != -1 && watch(ep_fd.GetHandle(), fds[i]) < 0)
                fds[i] = -1;
        }

        epoll_event ep_events[_Count];

        // Вызов epoll_wait может быть прерван сигналом. Ждём весь тайм-аут, так же, как в Windows
        int res = 0;
        while (true)
        {
            res = epoll_wait(ep_fd.GetHandle(), &ep_events[0], _Count, _Timeout);
            if (res == -1 && errno == EINTR && std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() < deadline)
                continue;
            break;
        }

        for (int i = 0; i < _Count; ++i)
        {
            if (fds[i] == -1)
                continue;

            for (int j = 0; j < res; ++j)
                if (ep_events[j].data.fd == fds[i] && (ep_events[j].events & EPOLLIN))
                    return i;
        }

        return COOPERATIVE_WAIT_TIMEOUT;
    }
};
#endif

And then just use concurrency::event

Andrey
  • 927
  • 11
  • 12
-2

Different from other answers, I propose a new version which:

  1. Unblocks all waiting threads before being deleted. In this case, deleting the semaphore will wake up all waiting threads and only after everybody wakes up, the semaphore destructor will exit.
  2. Has a parameter to the wait() call, to automatically unlock the calling thread after the timeout in milliseconds has passed.
  3. Has an options on the construtor to limit available resources count only up to the count the semaphore was initialized with. This way, calling notify() too many times will not increase how many resources the semaphore has.
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>

std::recursive_mutex g_sync_mutex;
#define sync(x) do { \
        std::unique_lock<std::recursive_mutex> lock(g_sync_mutex); \
        x; \
    } while (false);

class Semaphore {
    int _count;
    bool _limit;
    int _all_resources;
    int _wakedup;
    std::mutex _mutex;
    std::condition_variable_any _condition_variable;

public:
    /**
     * count - how many resources this semaphore holds
     * limit - limit notify() calls only up to the count value (available resources)
     */
    Semaphore (int count, bool limit)
        : _count(count),
        _limit(limit),
        _all_resources(count),
        _wakedup(count)
    {
    }

    /**
     * Unlock all waiting threads before destructing the semaphore (to avoid their segfalt later)
     */
    virtual ~Semaphore () {
        std::unique_lock<std::mutex> lock(_mutex);
        _wakeup(lock);
    }

    void _wakeup(std::unique_lock<std::mutex>& lock) {
        int lastwakeup = 0;

        while( _wakedup < _all_resources ) {
            lock.unlock();
            notify();
            lock.lock();
            // avoids 100% CPU usage if someone is not waking up properly
            if (lastwakeup == _wakedup) {
                std::this_thread::sleep_for( std::chrono::milliseconds(10) );
            }
            lastwakeup = _wakedup;
        }
    }

    // Mutex and condition variables are not movable and there is no need for smart pointers yet
    Semaphore(const Semaphore&) = delete;
    Semaphore& operator =(const Semaphore&) = delete;
    Semaphore(const Semaphore&&) = delete;
    Semaphore& operator =(const Semaphore&&) = delete;

    /**
     * Release one acquired resource.
     */
    void notify()
    {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling notify(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        _count++;
        if (_limit && _count > _all_resources) {
            _count = _all_resources;
        }
        _condition_variable.notify_one();
    }

    /**
     * This function never blocks!
     * Return false if it would block when acquiring the lock. Otherwise acquires the lock and return true.
     */
    bool try_acquire() {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling try_acquire(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        if(_count <= 0) {
            return false;
        }
        _count--;
        return true;
    }

    /**
     * Return true if the timeout expired, otherwise return false.
     * timeout - how many milliseconds to wait before automatically unlocking the wait() call.
     */
    bool wait(int timeout = 0) {
        std::unique_lock<std::mutex> lock(_mutex);
        // sync(std::cerr << getTime() << "Calling wait(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
        _count--;
        _wakedup--;
        try {
            std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();

            while(_count < 0) {
                if (timeout < 1) {
                    _condition_variable.wait(lock);
                }
                else {
                    std::cv_status status = _condition_variable.wait_until(lock, timenow + std::chrono::milliseconds(timeout));

                    if ( std::cv_status::timeout == status) {
                        _count++;
                        _wakedup++;
                        return true;
                    }
                }
            }
        }
        catch (...) {
            _count++;
            _wakedup++;
            throw;
        }
        _wakedup++;
        return false;
    }

    /**
     * Return true if calling wait() will block the calling thread
     */
    bool locked() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count <= 0;
    }

    /**
     * Return true the semaphore has at least all resources available (since when it was created)
     */
    bool freed() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count >= _all_resources;
    }

    /**
     * Return how many resources are available:
     * - 0 means not free resources and calling wait() will block te calling thread
     * - a negative value means there are several threads being blocked
     * - a positive value means there are no threads waiting
     */
    int count() {
        std::unique_lock<std::mutex> lock(_mutex);
        return _count;
    }

    /**
     * Wake everybody who is waiting and reset the semaphore to its initial value.
     */
    void reset() {
        std::unique_lock<std::mutex> lock(_mutex);
        if(_count < 0) {
            _wakeup(lock);
        }
        _count = _all_resources;
    }
};

Utility to print the current timestamp:

std::string getTime() {
    char buffer[20];
#if defined( WIN32 )
    SYSTEMTIME wlocaltime;
    GetLocalTime(&wlocaltime);
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
    std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
    auto duration = now.time_since_epoch();
    auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
    duration -= hours;
    auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
    duration -= minutes;
    auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
    duration -= seconds;
    auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
    duration -= milliseconds;
    time_t theTime = time( NULL );
    struct tm* aTime = localtime( &theTime );
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
    return buffer;
}

Example program using this semaphore:

// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
    std::cerr << getTime() << "Creating Semaphore" << std::endl;
    Semaphore* semaphore = new Semaphore(1, false);
    semaphore->wait(1000);
    semaphore->wait(1000);
    std::cerr << getTime() << "Auto Unlocking Semaphore wait" << std::endl;

    std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
    delete semaphore;

    std::cerr << getTime() << "Exiting after 10 seconds..." << std::endl;
    return 0;
}

Example output:

11:03:01.012 Creating Semaphore
11:03:02.012 Auto Unlocking Semaphore wait
11:03:07.012 Exiting after 10 seconds...

Extra function which uses a EventLoop to unlock the semaphores after some time:

std::shared_ptr<std::atomic<bool>> autowait(Semaphore* semaphore, int timeout, EventLoop<std::function<void()>>& eventloop, const char* source) {
    std::shared_ptr<std::atomic<bool>> waiting(std::make_shared<std::atomic<bool>>(true));
    sync(std::cerr << getTime() << "autowait '" << source << "'..." << std::endl);

    if (semaphore->try_acquire()) {
        eventloop.enqueue( timeout, [waiting, source, semaphore]{
            if ( (*waiting).load() ) {
                sync(std::cerr << getTime() << "Timeout '" << source << "'..." << std::endl);
                semaphore->notify();
            }
        } );
    }
    else {
        semaphore->wait(timeout);
    }
    return waiting;
}

Semaphore semaphore(1, false);
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);
std::shared_ptr<std::atomic<bool>> waiting_something = autowait(&semaphore, 45000, eventloop, "waiting_something");
Evandro Coan
  • 8,560
  • 11
  • 83
  • 144
-5

In case someone is interested in the atomic version, here is the implementation. The performance is expected better than the mutex & condition variable version.

class semaphore_atomic
{
public:
    void notify() {
        count_.fetch_add(1, std::memory_order_release);
    }

    void wait() {
        while (true) {
            int count = count_.load(std::memory_order_relaxed);
            if (count > 0) {
                if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    break;
                }
            }
        }
    }

    bool try_wait() {
        int count = count_.load(std::memory_order_relaxed);
        if (count > 0) {
            if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                return true;
            }
        }
        return false;
    }
private:
    std::atomic_int count_{0};
};
Jeffery
  • 75
  • 1
  • 6
  • 5
    I would expect the performance to be *much* worse. This code makes almost literally every possible mistake. As just the most obvious example, suppose the `wait` code has to loop several times. When it finally unblocks, it will take the mother of all mispredicted branches as the CPU's loop prediction will certainly predict it will loop again. I could list many more issues with this code. – David Schwartz Feb 13 '17 at 04:33
  • 2
    Here's another obvious performance killer: The `wait` loop will consume CPU microexecution resources as it spins. Suppose it's in the same physical core as the thread that's supposed to `notify` it -- it will slow that thread down terribly. – David Schwartz Feb 13 '17 at 04:34
  • 2
    And here's just one more: On x86 CPUs (the most popular CPUs today), a compare_exchange_weak operation is always a write operation, even if it fails (it writes back the same value it read if the compare fails). So suppose two cores are both in a `wait` loop for the same semaphore. They're both writing at full speed to the *same* cache line, which can slow other cores to a crawl by saturating inter-core buses. – David Schwartz Feb 13 '17 at 04:35
  • @DavidSchwartz Glad to see your comments. Not sure understand the '...CPU's loop prediction...' part. Agreed the 2nd one. Apparently your 3rd case can happen, but compare to mutex which causes user-mode to kernel-mode switch and system call, the inter-core sync is not worse. – Jeffery Feb 13 '17 at 21:49
  • @DavidSchwartz Why not share your lock-free version implementation? Tks, – Jeffery Feb 13 '17 at 21:51
  • You have a `while (true)` loop whose performance you only care about when you leave the loop. But since the loop will be taken many times, the CPU's branch prediction will predict that it will be taken the last time when it isn't taken. This will cause a mispredicted branch that blows out the pipelines right at the time performance is most critical. As for posting my implementation, it will depend heavily on the platform -- it's impossible to write platform-independent lock free code that performs well because of all the issues I mentioned above. (Locks work better for portable code.) – David Schwartz Feb 13 '17 at 22:05
  • 2
    I just looked at your code more closely. It does a read loop not a write loop. So it doesn't have the problem of two threads blocked in `wait` saturating inter-core resources. So it doesn't make almost every possible mistake -- that's a big, common one that you didn't make. – David Schwartz Feb 14 '17 at 18:42
  • @DavidSchwartz Glad to hear. I believe those performance-critical apps need this lock-free semaphore. See if any more comments or examples come out. – Jeffery Feb 15 '17 at 00:13
  • @DavidSchwartz I tested a condition_variable based semaphore vs. a mostly lock-free version by a well-regarded someone whose name I can't quite recall - something like "Presser." The condition_variable version was the clear winner in VC++. I suspect the same would be true on Linux. That's because the VC++ and Linux versions of std::mutex are optimized for non-locking when there is no contention. – Jive Dadson Jan 29 '18 at 09:53
  • 1
    There is no such thing as a lock free semaphore. The whole idea of being lock free is not to write code without using mutexes, but to write code where a thread never blocks at all. In this case the semaphore's very essence is to block threads that call the wait() function! – Carlo Wood Feb 21 '18 at 01:32