2

I'm looking for an object like a ConcurrentQueue which will allow me to await the Dequeue operation if the queue is empty so I can do something like the following:

public static async Task ServiceLoop() {

    var awaitedQueue = new AwaitedQueue<int>();

    while (!cancelled) {
        var item = await awaitableQueue.Dequeue();
        Console.WriteLine(item);
    }

}

I've written the following class, but if an item is added to the queue between the time Dequeue is called and a new awaiter is Enqueued, terrible things will happen.

public class AwaitedQueue<T> : IDisposable {

    ConcurrentQueue<TaskCompletionSource<T>> awaiters = new ConcurrentQueue<TaskCompletionSource<T>>();

    ConcurrentQueue<T> items = new ConcurrentQueue<T>();

    public AwaitedQueue() { }

    public void Enqueue(T item) {
        if (!awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) {
            this.items.Enqueue(item);
        } else {
            awaiter.SetResult(item);
        }
    }

    public async Task<T> Dequeue() {
        if (items.TryDequeue(out T item)) {
            return item;
        } else {
            // If an item is enqueued between this call to create a new TaskCompletionSource.
            var awaiter = new TaskCompletionSource<T>();
            // And this call to actually enqueue, I believe it will cause me problems.
            awaiters.Enqueue(awaiter);
            return await awaiter.Task;
        }
    }

    public void Dispose() {
        while (awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) {
            awaiter.SetCanceled();
            awaiter.Task.Wait();
        }
    }
}

I'm sure a robust and well-tested implementation of this concept already exists, but I don't know which combination of English words I need to type into Google to find it.

Mark Schad
  • 23
  • 5

1 Answers1

8

There is a modern solution for this: Channels. A "Channel" is an asynchronous producer/consumer queue.

Channels also have the concept of "completion", so you can complete the channel rather than having a cancelled flag.

Usage:

public static async Task ServiceLoop() {
  var awaitedQueue = Channel.CreateUnbounded<int>();
  var queueReader = awaitedQueue.Reader;

  while (await queueReader.WaitToReadAsync())
  {
    while (queueReader.TryRead(out var item))
    {
      Console.WriteLine(item);
    }
  }
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 1
    Thank you for the simple explanation and implementation example for Channels. The async read/write to a concurrent collection being consumed by a Task is the perfect application for this concept. While(true) loops wastes huge CPU resources in low bandwidth messaging buffers. This approach uses zero CPU unless it is processing a message. – Jim Broiles Jun 27 '22 at 15:35