2

I am trying to get my head around the following design, but fail to get a clear picture.

I have a number of producers submitting tasks/jobs to a queue. A consumer/worker would then pick these up and complete on these. For now, there is only one consumer/worker.

So far, this sounds like the standard producer/consumer pattern which could be done with a BlockingCollection.

However, some producers might want to submit a task/job and be able to wait for its completion (or submit multiple tasks/jobs and wait for some or all of them, etc.), while other producers would just "fire&forget" their tasks/jobs. (Note that this is not waiting for the queue to be empty, but waiting for a particular task/job).

How would this be done? In all examples I have seen, producers just post data to the queue using BlockingQueue.Add().

Any help would be highly appreciated.

user1211286
  • 681
  • 5
  • 17
  • 1
    Add an AutoResetEvent to the class declaration for the object you put in the queue. – Hans Passant Mar 28 '18 at 15:15
  • It's far better to just use [an asynchronous queue](https://stackoverflow.com/questions/25691679/best-way-in-net-to-manage-queue-of-tasks-on-a-separate-single-thread/25691829#25691829) rather than creating a bunch of threads just to have them sitting around doing nothing when they don't actually have work to do. – Servy Mar 28 '18 at 19:58
  • My apologies, but still not getting it. Without going into too much detail, this is about (socket-based) communication with another system. A thread is running anyway to maintain the socket (reconnect if closed), perform some low-level communication, etc. Producers want to queue messages for sending and some want to wait until their message has actually been delivered. Does this make sense? – user1211286 Mar 29 '18 at 08:35

1 Answers1

2

A common approach is to wrap your work operations using a TaskCompletionSource whose Task can be returned to the caller and awaited on for completion.

public class ProducerConsumerQueue
{
    private readonly BlockingCollection<Action> queue = new BlockingCollection<Action>();

    public Task Produce(Action work)
    {
        var tcs = new TaskCompletionSource<bool>();
        Action action = () =>
        {
            try
            {
                work();
                tcs.SetResult(true);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        };
        queue.Add(action);
        return tcs.Task;
    }

    public void RunConsumer(CancellationToken token)
    {
        while (true)
        {
            token.ThrowIfCancellationRequested();
            var action = queue.Take(token);
            action();
        }
    }
}

That said, you should consider leveraging the task infrastructure provided by TPL itself, rather than coming up with your own structures. If your only requirement is having a bounded number of consumers, you could use a LimitedConcurrencyLevelTaskScheduler.

Douglas
  • 53,759
  • 13
  • 140
  • 188
  • There's no need to use a TCS. Just make the method `async`. That's virtually always preferable to using an explicit TCS when possible, and it's almost always possible. – Servy Mar 28 '18 at 19:55
  • 1
    @Servy: I don't get it. If I use async, the `Produce` method would enqueue a `Func`, but what would it return? – Douglas Mar 28 '18 at 19:59
  • I've read your other comment further above; I get what you mean. I agree. – Douglas Mar 28 '18 at 20:01
  • 1
    Maybe I am wrong, but I am sticking to the idea that producers want to put work in a queue and a consumer would process these. I don't see how "make the method async" (which one?) would address that. – user1211286 Mar 29 '18 at 08:38