5

How could I use DataflowBlockOptions.CancellationToken?

If I create instance of BufferBlock like this:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = _cts.Token });

then having consumer/producer methods that use queue, how can I use its CancellationToken to handle cancellation?

E.g. in producer method, how can I check the cancellation token - I haven't found any property to access the token..

EDIT: Sample of produce/consume methods:

private static async Task Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
    foreach (var value in values)
    {
        await queue.SendAsync(value);
    }

    queue.Complete();
}

private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
{
    var ret = new List<int>();
    while (await queue.OutputAvailableAsync())
    {
        ret.Add(await queue.ReceiveAsync());
    }

    return ret;
}

Code to call it:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = _cts.Token });

// Start the producer and consumer.
var values = Enumerable.Range(0, 10);
Produce(queue, values);
var consumer = Consume(queue);

// Wait for everything to complete.
await Task.WhenAll(consumer, queue.Completion);

EDIT2:

If I call _cts.Cancel(), the Produce method does not cancel and finishes without interruption.

eXavier
  • 4,821
  • 4
  • 35
  • 57
  • what do you mean with `how I can use`? If you want to cancel the operation you can call `.Cancel` on the `_cts` object (assuming its a `CancellationTokenSource`. Of course you can always check if there is a cancellation requested with `IsCancellationRequested` – Random Dev Apr 20 '15 at 13:25
  • I can pass cancellation token into `BufferBlock` constructor (through `DataflowBlockOptions`). Then, if I pass this instance of `BufferBlock` into some library's produce method, that method has no way to access the token from `BufferBlock`. So what is usage of this? – eXavier Apr 20 '15 at 13:42
  • can you give a complete example? In the case you can just give the Token to the Task you are going to create in the end as well [see here for an introduction](http://blogs.msdn.com/b/dotnet/archive/2012/06/06/async-in-4-5-enabling-progress-and-cancellation-in-async-apis.aspx) – Random Dev Apr 20 '15 at 14:08
  • @CarstenKönig see my edited question. I'm specifically interested in how could I use the token passed in `DataflowBlockOptions.CancellationToken` or what is the purpose of passing it into the constructor of `BufferBlock` – eXavier Apr 20 '15 at 14:12

2 Answers2

2

If you want to cancel produce process you should pass token in it, like this:

    private static async Task Produce(
        BufferBlock<int> queue, 
        IEnumerable<int> values,
        CancellationToken token
        )
    {
        foreach (var value in values)
        {
            await queue.SendAsync(value, token);
            Console.WriteLine(value);
        }

        queue.Complete();
    }

    private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
    {
        var ret = new List<int>();
        while (await queue.OutputAvailableAsync())
        {
            ret.Add(await queue.ReceiveAsync());
        }

        return ret;
    }

    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();

        var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = cts.Token });

        // Start the producer and consumer.
        var values = Enumerable.Range(0, 100);
        Produce(queue, values, cts.Token);
        var consumer = Consume(queue);

        cts.Cancel();

        try
        {
            Task.WaitAll(consumer, queue.Completion);
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }

        foreach (var i in consumer.Result)
        {
            Console.WriteLine(i);
        }

        Console.ReadKey();
gabba
  • 2,815
  • 2
  • 27
  • 48
  • If you modify `Produce` to output the value in the loop you can see that cancelling the operation does not prevent the producer sending to queue. My expectation was that on the line `await queue.SendAsync(i);` it will throw TaskCancellationException. Moreover, Produce() cannot access the CancellationToken from the `queue` to check its state and cancel itself. From this, it seems it's better to pass cancellation token directly to Produce/Consume methods rather than relying on DataflowBlockOptions. Still, I'm not sure if I'm missing something.. – eXavier Apr 21 '15 at 11:25
  • @FilipHurta Perhaps, SendAsync not checking cancelation, because the send side and receive side should have minimum shared resources, or performance reasons – gabba Apr 21 '15 at 15:34
2

Normally you use the CancellationToken option in order to control the cancellation of a dataflow block, using an external CancellationTokenSource. Canceling the block (assuming that its a TransformBlock) has the following immediate effects:

  1. The block stops accepting incoming messages. Invoking its Post returns false, meaning that the offered message is rejected.
  2. The messages that are currently stored in the block's internal input buffer are immediately discarded. These messages are lost. They will not be processed or propagated.

If the block is not currently processing any messages, the following effects will also follow immediately. Otherwise they will follow when the processing of all currently processed messages is completed:

  1. All the processed messages that are currently stored in this block's output buffer are discarded. The last processed messages (the messages that were in the middle of processing when the cancellation occurred) will not be propagated to linked blocks downstream.
  2. Any pending asynchronous SendAsync operations targeting the block, that were in-flight when the cancellation occurred, will complete with a result of false (meaning "non accepted").
  3. The Task that represents the Completion of the block transitions to the Canceled state. In other words this task's IsCanceled property becomes true.

You can achieve all but the last effect directly, without using the CancellationToken option, by invoking the block's Fault method. This method is accessible through the IDataflowBlock interface that all blocks implement. You can use it like this:

((IDataflowBlock)block).Fault(new OperationCanceledException());

The difference is that the Completion task will now become Faulted instead of Canceled. This difference may or may not be important, depending on the situation. If you just await the Completion, which is how this property is normally used, in both cases a OperationCanceledException will be thrown. So if you don't need to do anything fancy with the Completion property, and you also want to avoid configuring the CancellationToken for some reason, you could consider this trick as an option.


Update: Behavior when the cancellation occurs after the Complete method has been invoked, in other words when the block is already in its completion phase, but has not completed yet:

  1. If the block is a processing block, like a TransformBlock, all of the above will happen just the same. The block will transition soon to the Canceled state.
  2. If the block is a non-processing block, like a BufferBlock<T>, the (3) from the list above will not happen. The output buffer of a BufferBlock<T> is not emptied, when the cancellation happen after the invocation of the Complete method. See this GitHib issue for a demonstration of this behavior. Please take into consideration that the Complete method may be invoked not only manually, but also automatically, if the block has been linked as the target of a source block, with the PropagateCompletion configuration enabled. You may want to check out this question, to understand fully the implications of this behavior. Long story short, canceling all the blocks of a dataflow pipeline that contains a BufferBlock<T>, does not guarantee that the pipeline will terminate.

Side note: When both the Complete and Fault methods are invoked, whatever was invoked first prevails regarding the final status of the block. If the Complete was invoked first, the block will complete with status RanToCompletion. If the Fault was invoked first, the block will complete with status Faulted. Faulting a Completed block has still an effect though: it empties its internal input buffer.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104