The below class describes a lock free stack of uint32_t
sequential values (full code here). For instance, LockFreeIndexStack stack(5);
declares a stack containing the numbers {0, 1, 2, 3, 4}
. This class has pool semantic. The capacity of the stack is fixed. Only the values originally introduced in the stack can be extracted and reinserted. So at any particular point in time any of those values can be either inside the stack or outside, but not both. A thread can only push an index that it previously got via a pop. So the correct usage is for a thread to do:
auto index = stack.pop(); // get an index from the stack, if available
if(index.isValid()) {
// do something with 'index'
stack.push(index); // return index to the stack
}
Both the push
and pop
methods are implemented with an atomic load
and a CAS
loop.
What is the correct memory order semantic I should use in the atomic operations in pop
and push
(I wrote my guess commented out)?
struct LockFreeIndexStack
{
typedef uint64_t bundle_t;
typedef uint32_t index_t;
private:
static const index_t s_null = ~index_t(0);
typedef std::atomic<bundle_t> atomic_bundle_t;
union Bundle {
Bundle(index_t index, index_t count)
{
m_value.m_index = index;
m_value.m_count = count;
}
Bundle(bundle_t bundle)
{
m_bundle = bundle;
}
struct {
index_t m_index;
index_t m_count;
} m_value;
bundle_t m_bundle;
};
public:
LockFreeIndexStack(index_t n)
: m_top(Bundle(0, 0).m_bundle)
, m_next(n, s_null)
{
for (index_t i = 1; i < n; ++i)
m_next[i - 1] = i;
}
index_t pop()
{
Bundle curtop(m_top.load()); // memory_order_acquire?
while(true) {
index_t candidate = curtop.m_value.m_index;
if (candidate != s_null) { // stack is not empty?
index_t next = m_next[candidate];
Bundle newtop(next, curtop.m_value.m_count);
// In the very remote eventuality that, between reading 'm_top' and
// the CAS operation other threads cause all the below circumstances occur simultaneously:
// - other threads execute exactly a multiple of 2^32 pop or push operations,
// so that 'm_count' assumes again the original value;
// - the value read as 'candidate' 2^32 transactions ago is again top of the stack;
// - the value 'm_next[candidate]' is no longer what it was 2^32 transactions ago
// then the stack will get corrupted
if (m_top.compare_exchange_weak(curtop.m_bundle, newtop.m_bundle)) {
return candidate;
}
}
else {
// stack was empty, no point in spinning
return s_null;
}
}
}
void push(index_t index)
{
Bundle curtop(m_top.load()); // memory_order_relaxed?
while (true) {
index_t current = curtop.m_value.m_index;
m_next[index] = current;
Bundle newtop = Bundle(index, curtop.m_value.m_count + 1);
if (m_top.compare_exchange_weak(curtop.m_bundle, newtop.m_bundle)) {
return;
}
};
}
private:
atomic_bundle_t m_top;
std::vector<index_t> m_next;
};