The LockFreeQueueMPMC should solve the MPMC problem without locks, but during the runtime there is memory corruption. LockFreeDispatchStackMPMC does solve the MPMC problem without locks and is used as a basis for the LockFreeCacheMPMC allocator. Both of these implementations pass stress testing.
The LockFreeQueueMPMC Enqueue performs the same operation as Lock FreeDispatchStackMPMC Send. This adds a new node to the list. The Dequeue operation is more complicated. Only one pointer can ever be cmpexg at a time so there is no solution with a Tail pointer. Inorder to remove a node from the list, the list needs to be walked and the last node removed. This changes the time for Dequeue from O(1) to O(N), but lock-free.
The LockFreeDispatchStackMPMC is an out-of-order MPMC lock-free solution. Messages that arrived earliest get serviced first. This uses a stack instead of a queue, for some problems this is not acceptable as the messages must be ordered. This shows a 40%+ performance improvement over the Queue, if your messages can be unordered.
template<class T>
struct Node
{
std::atomic<int> Next;
T *Data;
};
template<class T>
class LockFreeDispatchStackMPMC
{
public:
LockFreeDispatchStackMPMC()
{
Head = NULL;
}
~LockFreeDispatchStackMPMC(){
}
void Send(T *item)
{
Node<T> * new_node = Cache.Malloc();
new_node->Data=item;
bool done = false;
while(!done)
{
auto head = Head.load();
new_node->Next.store( head);
if( Head.compare_exchange_weak(head,new_node))
{
done = true;
}
}
}
T *Recieve()
{
T *returnValue = NULL;
bool done = false;
while(!done)
{
auto head = Head.load();
if(head == NULL)
{
done=true;
}
else
{
Node<T> * curr = head;
Node<T> *next = curr->Next.load();
if(Head.compare_exchange_weak(head,next))
{
done = true;
returnValue = curr->Data;
curr->Next =NULL;
Cache.Free(curr);
}
}
}
return returnValue;
}
public:
std::atomic<Node<T> *> Head;
private:
LockFreeMemCache<Node<T> > Cache;
};
This is a Cache of Objects container based on using two lists, usable as a cross-thread pool to store objects. This also allows reads from the graveyard as the objects have not been destroyed, writes should not be allowed. This is a problem in the queue algorithm. This is MPMC safe as well, even if every Node has to be allocated one at a time.
#define GROW_BY_SIZE 4
template<class T>
class LockFreeCacheMPMC
{
public:
LockFreeCacheMPMC()
{
Head=NULL;
FreeStack=NULL;
AddSomeCache();
}
~LockFreeCacheMPMC()
{
Node<T> *node ,*prev;
bool done = false;
node = Head;
prev = NULL;
while(!done)
{
prev = node;
if(node == NULL)
{
done = true;
}
else
{
node = node->Next.load();
delete prev->Data;
delete prev;
}
}
done = false;
node = FreeStack;
prev = NULL;
while(!done)
{
prev = node;
if(node == NULL)
{
done = true;
}
else
{
node = node->Next.load();
delete prev;
}
}
}
T *Malloc()
{
T *returnValue = NULL;
returnValue=Pop();
while(returnValue==NULL)
{
AddSomeCache();
returnValue=Pop();
}
return returnValue;
}
void Free(T *ptr)
{
Push(ptr);
}
private:
void AddSomeCache()
{
for(int i=0; i < GROW_BY_SIZE; i++)
{
T *tmp = new T();
Push(tmp);
}
}
private:
void Push(T *item)
{
Node<T> * new_node = PopNode(true);
new_node->Data=item;
bool done = false;
while(!done)
{
Node<T>* head = Head.load();
new_node->Next.store(head);
if(Head.compare_exchange_weak(head,new_node))
{
done = true;
}
}
}
T *Pop()
{
T *returnValue = NULL;
bool done = false;
while(!done)
{
Node<T> * curr= Head.load();
if(curr == NULL)
{
done=true;
}
else
{
Node<T> *next = curr->Next.load();
if(Head.compare_exchange_weak(curr,next))
{
done = true;
returnValue = curr->Data;
PushNode(curr);
}
}
}
return returnValue;
}
void PushNode(Node<T> *item)
{
item->Next = NULL;
item->Data = NULL;
bool done = false;
while(!done)
{
Node<T>* fs = FreeStack.load();
item->Next.store(fs);
if(FreeStack.compare_exchange_weak(fs,item))
{
done = true;
}
}
}
Node<T> *PopNode(bool Alloc)
{
Node<T> *returnValue = NULL;
bool done = false;
while(!done)
{
Node<T> *fs = FreeStack.load();
if(fs == NULL)
{
done=true;
}
else
{
Node<T> *next = fs->Next.load();
if(FreeStack.compare_exchange_weak(fs,next))
{
done = true;
returnValue = fs;
}
}
}
if ((returnValue == NULL) &&Alloc )
{
returnValue =new Node<T>();
returnValue->Data = NULL;
returnValue->Next = NULL;
}
return returnValue;
}
std::atomic<Node<T> *> Head;
std::atomic<Node<T> *>FreeStack;
};
This is the problem class. It will run for awhile, but corruption happens. The problem is in the Dequeue method. Nodes get trimmed off the list one at a time. Its possible for a node to get trimmed out from underneath you at every step. This results in nodes that have been trimmed off and need to be 'deleted', but there are still active threads reading from the node. The algorithm should prevent any writes to a dead node as atomically a next pointer is either pointing at a node or null, but using a Cache Pool to store the nodes allows for safe reads from the graveyard.
template<class T>
class LockFreeQueueMPMC
{
public:
LockFreeQueueMPMC()
{
Head=NULL;
}
~LockFreeQueueMPMC(){
}
void Enqueue(T *item)
{
Node<T> * new_node = Cache.Malloc();
new_node->Data=item;
bool done = false;
while(!done)
{
auto head = Head.load();
new_node->Next.store(head);
if(Head.compare_exchange_weak(head,new_node))
{
done = true;
}
}
}
T *Dequeue()
{
T *returnValue=NULL;
bool done = false;
while(!done)
{
Node<T> *head = Head.load();
if(head == NULL)
{
done = true;
}
else
{
Node<T> * prev, *curr;
prev = NULL;
curr = head;
bool found = false;
while(!found)
{
if(curr == NULL)
{
break;
}
Node<T> * next = curr->Next.load();
if(next == NULL)
{
found=true;
break;
}
prev = curr;
curr = next;
}
if(found)
{
if(prev == NULL)
{
if(Head.compare_exchange_weak(head,NULL))
{
done = true;
}
}
else
{
if(prev->Next.compare_exchange_weak(curr,NULL))
{
done = true;
}
}
if(done)
{
returnValue = curr->Data;
Cache.Free(curr);
}
}
}
}
return returnValue;
}
private:
std::atomic<Node<T> *> Head;
LockFreeMemCache<Node<T> > Cache;
};
The problem is located in the Dequeue method, there is a step that corrupts, but only rarely.