I'm looking for an object like a ConcurrentQueue which will allow me to await the Dequeue operation if the queue is empty so I can do something like the following:
public static async Task ServiceLoop() {
var awaitedQueue = new AwaitedQueue<int>();
while (!cancelled) {
var item = await awaitableQueue.Dequeue();
Console.WriteLine(item);
}
}
I've written the following class, but if an item is added to the queue between the time Dequeue is called and a new awaiter is Enqueued, terrible things will happen.
public class AwaitedQueue<T> : IDisposable {
ConcurrentQueue<TaskCompletionSource<T>> awaiters = new ConcurrentQueue<TaskCompletionSource<T>>();
ConcurrentQueue<T> items = new ConcurrentQueue<T>();
public AwaitedQueue() { }
public void Enqueue(T item) {
if (!awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) {
this.items.Enqueue(item);
} else {
awaiter.SetResult(item);
}
}
public async Task<T> Dequeue() {
if (items.TryDequeue(out T item)) {
return item;
} else {
// If an item is enqueued between this call to create a new TaskCompletionSource.
var awaiter = new TaskCompletionSource<T>();
// And this call to actually enqueue, I believe it will cause me problems.
awaiters.Enqueue(awaiter);
return await awaiter.Task;
}
}
public void Dispose() {
while (awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) {
awaiter.SetCanceled();
awaiter.Task.Wait();
}
}
}
I'm sure a robust and well-tested implementation of this concept already exists, but I don't know which combination of English words I need to type into Google to find it.