Consider the following example code where thread A pushes functions on a queue and thread B executes those when popping from the queue:
std::atomic<uint32_t> itemCount;
//Executed by thread A
void run(std::function<void()> function) {
if (queue.push(std::move(function))) {
itemCount.fetch_add(1, std::memory_order_acq_rel);
itemCount.notify_one();
}
}
//Executed by thread B
void threadMain(){
std::function<void()> function;
while(true){
if (queue.pop(function)) {
itemCount.fetch_sub(1, std::memory_order_acq_rel);
function();
}else{
itemCount.wait(0, std::memory_order_acquire);
}
}
}
where queue
is a concurrent queue which has a push
and a pop
function, each returning a bool
indicating whether the given operation was successful. So push
returns false
if it's full and pop
returns false
if it's empty.
Now I was wondering if the code is thread-safe under all circumstances.
Let's suppose thread B's pop
fails and is about to invoke std::atomic<T>::wait
. At the same time, thread A pushes a new element while thread B checks the initial wait condition. Since itemCount
hasn't changed yet, it fails.
Immediately after that, thread A increments the counter and tries to notify one waiting thread (although thread B doesn't wait internally yet). Thread B finally waits on the atomic, causing the thread to never wake up again due to the lost signal despite there being an element in the queue. That only stops as soon as a new element is being pushed on the queue, notifying B to continue execution.
I wasn't able to reproduce this situation manually since the timing is close to impossible to get right.
Is this a serious concern or impossible to happen? What (preferably atomic) alternatives do exist in order to account for such rare situations?
EDIT: Just to mention, the queue is not blocking and only utilizes atomic operations.
The reason I'm asking is I don't understand how it's possible to implement an atomic wait
operation. Although the standard says the whole operation is atomic (consisting of a load + predicate check + wait), in the implementation I'm using std::atomic<T>::wait
is implemented roughly as follows:
void wait(const _TVal _Expected, const memory_order _Order = memory_order_seq_cst) const noexcept {
_Atomic_wait_direct(this, _Atomic_reinterpret_as<long>(_Expected), _Order);
}
where _Atomic_wait_direct
is defined as
template <class _Ty, class _Value_type>
void _Atomic_wait_direct(
const _Atomic_storage<_Ty>* const _This, _Value_type _Expected_bytes, const memory_order _Order) noexcept {
const auto _Storage_ptr = _STD addressof(_This->_Storage);
for (;;) {
const _Value_type _Observed_bytes = _Atomic_reinterpret_as<_Value_type>(_This->load(_Order));
if (_Expected_bytes != _Observed_bytes) {
return;
}
__std_atomic_wait_direct(_Storage_ptr, &_Expected_bytes, sizeof(_Value_type), _Atomic_wait_no_timeout);
}
}
We can clearly see that there is an atomic load with the specified memory order to check the state of the atomic itself. However, I don't see how the whole operation can be considered atomic since there is a comparision right before the call to __std_atomic_wait_direct
.
With condition variables, the predicate itself is secured by a mutex but how is the atomic itself secured here?