7

I have one process generating work and a second process with a BlockingCollection<> that consumes that work. When I close my program, I need my consumer to stop consuming work, but I still need to quickly log the work that was pending but hadn't been consumed.

Right now, my consumer spawns a thread that has a foreach (<object> in BlockingCollection.GetConsumingEnumerable()) loop. When I stop my program, my producer calls Consumer.BlockingCollection.CompleteAdding(). What I find is that my consumer continues to process everything in the queue.

Googling the issues tells me that I need to use a CancellationToken. So I tried that out:

private void Process () { // This method runs in a separate thread
    try {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
            // Consume
        }
    }
    catch (OperationCancelledException) {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
            // quickly log
        }
    }
}

My producer has:

private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();

When I try this, I get no indication that the OperationCancelledException ever happened.

This question tries to explain the use of a cancellation token, but it seems like it's not using it correctly. (Argument: if it works, then it's "correct enough".) And this question would appear to be an exact duplicate of my question but with no example. (Same here.)

So to reiterate: How do I properly use a CancellationToken on BlockingCollection.GetConsumingEnumerable() with the caveat that I need to process the remaining items in the queue after it gets cancelled using a different method?

(I think my problem is centered around the proper use of the CancellationToken. None of my testing indicates that the process is actually being cancelled. (StopFlag.IsCancellationRequested always equals false.))

Community
  • 1
  • 1
Jason
  • 156
  • 1
  • 11
  • The `GetConsumingEnumerable` will wait for more items to be added. Inside the `catch` use the `foreach` on the collection itself. – Dustin Kingen Nov 11 '13 at 17:29
  • I'd love to try that, Romoku! How do I get to it? I used the simple ctor: `public BlockingCollection OrderQueue = new BlockingCollection ();` – Jason Nov 11 '13 at 17:35
  • You need to check the cancellation token. CancellationToken.IsCancellationRequeste In your case if true then still process but log rather than process. http://msdn.microsoft.com/en-us/library/dd997289.aspx – paparazzo Nov 11 '13 at 17:38

3 Answers3

5

When you pass in the CancellationToken to GetConsumingEnumerable it won't throw an exception of cancellation is requested, it'll just stop spitting out items. Rather than catching the exception just check the token:

foreach (var item in BlockingCollection.
    GetConsumingEnumerable(CancellationToken))
{
    //consume item
}
if (CancellationToken.IsCancellationRequested)
    foreach (var item in BlockingCollection)
    {
        //log item
    }

Also note that, if cancellation is requested, and it's possible that CompletedAdding hasn't been called then you should just iterate the collection, not call GetConsumingEnumerable. If you know that the producer will complete adding when the operation is cancelled then that's not a problem.

Servy
  • 202,030
  • 26
  • 332
  • 449
  • I tried the `if()` statement both outside the loop (like you have it) and inside the loop. Neither one worked. I'm not even getting notification that the cancellation flag is doing anything. – Jason Nov 11 '13 at 17:49
  • @Jason Then your token isn't being canceled, and that's your problem. Ensure that the token is actually being cancelled before you finish processing the items. – Servy Nov 11 '13 at 17:50
  • @Jason Based on your usage code, you're cancelling *just* before you finish adding. There's a good chance that both operations will be finished by the time the consuming code runs again, and if your consumer is faster than your producer (therefore it has no items left) it could very easily finish consuming all items before the cancellation token indicates it should be canceled. You should be working your example such that it's *impossible* for the consumer to finish consuming normally before being cancelled if that's the case you want to test. – Servy Nov 11 '13 at 17:53
  • I am. I have Sleep() calls simulating work. Each step of "processing" logs the item processed and the remaining number of items in the queue. (`BlockingCollection.Count`) My original code showed the count winding down to zero. Subsequent tests show the count simply stopping, but I **never** get my message of "Operation was cancelled" – Jason Nov 11 '13 at 17:57
  • @Jason So you've verified that the collection isn't empty, but `IsCancellationRequested` is `false`, *after the end of the `foreach` loop*? I find that very odd. If that's the case, can you provide a complete code example to replicate the problem? – Servy Nov 11 '13 at 17:59
  • 3
    GetConsumingEnumerable WILL throw an exception when cancellation is requested – Darragh Apr 22 '15 at 22:55
3

My problem was in how I was trying to cancel the operation. Instead of having my producer owning the CancellationTokenSource, I put it all in the consumer.

public class cProducer {
    private cConsumer myConsumer = new cConsumer ();

    public void onStart () {
        myConsumer.OnStart ();
    }

    public void onStop () {
        myConsumer.OnStop ();
    }

    public void OnOrderReceived (cOrder newOrder) {
        myConsumer.orderQueue.Add (cOrder);
    }
}

public class cConsumer {
    private CancellationTokenSource stopFlag;
    public BlockingCollection<cOrder> orderQueue = new BlockingCollection<cOrder> ();
    private Task processingTask;

    public void OnStart () {
        stopFlag = new CancellationTokenSource ();
        processingTask = Task.Factory.StartNew (() => Process ());
    }

    public void OnStop () {
        stopFlag.Cancel ();
        orderQueue.CompleteAdding ();
        processingTask.Wait ();
    }

    private void Process () {
        try {
            foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) {
                // process
            }
        }
        catch (OperationCanceledException) {
            foreach (cOrder cancelledOrder in orderQueue.GetConsumingEnumerable ()) {
                // log it
            }
        }
    }
}
Jason
  • 156
  • 1
  • 11
  • Why do you have a GetConsumingEnumerable call in your OperationCanceledException handler? And furthermore, why doesn't this block the processingTask.Wait() call from finishing. – priehl Aug 07 '17 at 16:13
  • https://msdn.microsoft.com/en-us/library/dd287086(v=vs.110).aspx says "After a collection has been marked as complete for adding, adding to the collection is not permitted and attempts to remove from the collection will not wait when the collection is empty." So i suppose that makes sense now, but you can also enumerate it as you would any IEnumerable in the catch block. – priehl Aug 07 '17 at 17:12
0

I had the exact same problem. The BlockingCollection seemed to be deadlocked when I cancelled the procedure. The OperationCanceledException was not propagated to the calling method. I figured out that my Producer did not take the cancellation token in consideration and was therefore waiting for the queue to be consumed. All I had to do was to provide the cancellation token in the Add() method. To translate this into Jason's solution above, all I did was this:

public void OnOrderReceived (cOrder newOrder, CancellationToken cancellationToken) 
{
    myConsumer.orderQueue.Add (cOrder, cancellationToken);
}

The Process() method does not need any try-catch clause. However you need to throw if the process is cancelled:

private void Process () 
{
    foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) 
    {
        // process
        stopFlag.Token.ThrowIfCancellationRequested();
    }
}