edit: this is not a duplicate of any question that allows mutex locking in post(). Please read carefully, I need a lockfree post()! Don't mark this duplicate if you don't have a real answer.
A semaphore (like in linux) is a useful building block that is not found in the c++ standard, and neither in boost (currently). I'm mainly talking about semaphores between threads of a single process, over a preemptive scheduler.
I'm specifically interested in them being non-blocking (i.e. lockfree) unless it actually needs to block. That is, a post() and try_wait() should be always lockfree. And wait() calls should be lockfree if their invocations strongly-happen-after enough post()s have returned. Also a blocking wait() should be blocked by the scheduler rather than spin locked. What if I also want a wait_for with timeout - how much does it complicate the implementation further, while still avoiding starvation?
Are there reasons why semaphores are not in the standard?
Edit3: So, I wasn't aware that there's a proposal to the standard P0514R4 that exactly deals with these issues, and has solutions to all the problems raised here, apart from specifically adding an std::semaphore. http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0514r4.pdf
Also boost doesn't have these. Specifically, the ones in interprocess are spin locked.
What libraries support something like that?
Is it possible to implement it over windows api and other widespread systems?
edit: It is not possible to implement that lockfree using atomics+mutex+condition_variable - you either have to block in post or spin in wait. If you want a lockfree post(), you cannot lock a mutex in post(). I want to run on a possibly preemptive scheduler, and I don't want post() blocked by other threads that took the mutex and got preempted. So, this is not a duplicate of questions like C++0x has no semaphores? How to synchronize threads?
edit2: Following an example implementation just to demonstrate the best that can be done with atomics+mutex+condvar, AFAIK. post() and wait() perform one lockfree compare_exchange, and only if they must, they lock the mutex.
However post() is not lockfree. And worse, it might be blocked by a wait() that locked the mutex and got preempted.
For simplicity, I only implemented post_one() and wait_one_for(Duration), instead of post(int) and wait_for(int,Duration). Also I'm assuming no-spurious-wakeup which is not promised by the standard.
class semaphore //provides acquire release memory ordering for the user
{
private:
using mutex_t = std::mutex;
using unique_lock_t = std::unique_lock<mutex_t>;
using condvar_t = std::condition_variable;
using counter_t = int;
std::atomic<counter_t> atomic_count_;
mutex_t mutex_;
condvar_t condvar_;
counter_t posts_notified_pending_;
counter_t posts_unnotified_pending_;
counter_t waiters_running_;
counter_t waiters_aborted_pending_;
public:
void post_one()
{
counter_t start_count = atomic_count_.fetch_add(+1, mo_acq_rel);
if (start_count < 0) {
unique_lock_t lock(mutex_);
if (0 < waiters_running_) {
++posts_notified_pending_;
condvar_.notify_one();
}
else {
if (0 == waiters_aborted_pending_) {
++posts_unnotified_pending_;
}
else {
--waiters_aborted_pending_;
}
}
}
}
template< typename Duration >
bool wait_one_for(Duration timeout)
{
counter_t start_count = atomic_count_.fetch_add(-1, mo_acq_rel);
if (start_count <= 0) {
unique_lock_t a_lock(mutex_);
++waiters_running_;
BOOST_SCOPE_EXIT(&waiters_running_) {
--waiters_running_;
} BOOST_SCOPE_EXIT_END
if( ( 0 == posts_notified_pending_ ) && ( 0 < posts_unnotified_pending_ ) ) {
--posts_unnotified_pending_;
return true;
}
else {
auto wait_result = condvar_.wait_for( a_lock, timeout);
switch (wait_result) {
case std::cv_status::no_timeout: {
--posts_notified_pending_;
return true;
} break;
case std::cv_status::timeout: {
counter_t abort_count = atomic_count_.fetch_add(+1, mo_acq_rel);
if (abort_count >= 0) {
/*too many post() already increased a negative atomic_count_ and will try to notify, let them know we aborted. */
++waiters_aborted_pending_;
}
return false;
} break;
default: assert(false); return false;
}
}
}
return true;
}
bool try_wait_one()
{
counter_t count = atomic_count_.load( mo_acquire );
while (true) {
if (count <= 0) {
return false;
}
else if (atomic_count_.compare_exchange_weak(count, count-1, mo_acq_rel, mo_relaxed )) {
return true;
}
}
}
};