I have a queue of commands. Some of them have the same attributes, say "document id". I need to be able to process them all in parallel, but with one restriction: commands with same features should be processed in the order they appeared in the queue.
For example: My queue is [n, a, s, j, a, l, v, g, a, f, f], where letters are DocumentIds. I need parallel processing, but the processing of 'a's should be in the order they appear in the queue, i.e. [1, 4, 8], where numbers are ids of letters in the queue. So, it doesn't matter in which order those elements are processed, as long as [8] is after [4], which is after [1] (with any number of intermediate items between them).
First, I tried SemaphoreSlim locking on DocumentId. That means, if we take an item to process, we block other threads if they should process the same item. This didn't work, because SemaphoreSlim doesn't guarantee FIFO order of unblocking.
Then, I made a wrapper aroung SemaphoreSlim, to enforce FIFO unblocking:
public class FifoAsyncLock : IDisposable
{
private readonly SemaphoreSlim _sem = new (1, 1);
private readonly ConcurrentQueue<TaskCompletionSource> _queue = new ();
public async Task WaitAsync()
{
var tcsE = new TaskCompletionSource();
_queue.Enqueue(tcsE);
await _sem.WaitAsync();
if (_queue.TryDequeue(out var tcsD))
tcsD.SetResult();
await tcsE.Task;
}
public void Release()
{
_sem.Release();
}
public void Dispose()
{
_sem.Dispose();
}
}
I used it in a class, where I stored a semaphore for each DocumentId, and also keeped count of how many lock users are waiting to be unblocked. If the last user releases the lock, it's deleted (because memory):
public class DocIdLocker : IDisposable
{
private readonly ConcurrentDictionary<Guid, FifoAsyncLock> _docIdLocks = new ();
private readonly ConcurrentDictionary<Guid, int> _users = new ();
private bool _disposed;
public async Task<IAsyncDisposable> AquireLockAsync(Guid docId)
{
var userCount = _users.AddOrUpdate(docId, 1, (_, o) => o + 1);
await _docIdLocks.GetOrAdd(docId, new FifoAsyncLock()).WaitAsync();
return new Lock(this, docId);
}
private async Task Release(Guid docId)
{
if (!_docIdLocks.ContainsKey(docId))
throw new KeyNotFoundException($"Key not found: '{docId}'");
_docIdLocks[docId].Release();
if (!_users.ContainsKey(docId))
throw new KeyNotFoundException($"Key not found: '{docId}'");
if (--_users[docId] == 0)
{
_docIdLocks.TryRemove(docId, out _);
_users.TryRemove(docId, out _);
}
}
private class Lock : IAsyncDisposable
{
private readonly DocIdLocker _parent;
private readonly Guid _docId;
public Lock(DocIdLocker parent, Guid docId)
{
_parent = parent;
_docId = docId;
}
public ValueTask DisposeAsync() => new (_parent.Release(_docId));
}
public void Dispose()
{
if (_disposed)
return;
foreach (var item in _docIdLocks.Values)
item.Dispose();
_users.Clear();
_disposed = true;
}
}
But my tests still showed be that order of 'a's is not preserved.
I thought, maybe some threads take elements when lock doesn't exist still, and process them out of order. Well, it all became very hard to reason about, and everything is mixing up in my head right now.
Is there a simple and elegant way to achieve what I'm trying to achieve?