1

it seems that I do not understand TPL Dataflow error handling.

Lets assume I have a list of items I wanna process and I use a ActionBlock for that:

var actionBlock = new ActionBlock<int[]>(async tasks =>
{
    foreach (var task in tasks)
    {
        await Task.Delay(1);

        if (task > 30)
        {
            throw new InvalidOperationException();
        }

        Console.WriteLine("{0} Completed", task);
    }
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 200,
    MaxDegreeOfParallelism = 4
});

for (i = 0; i < 10000; i++)
{
    if (!await bufferBlock.SendAsync(i))
    {
        break;
    }
}

actionBlock.Complete();

await actionBlock.Completion;

If an error occurs the block transitions to faulted state and SendAsync(...) returns false. I can just stop my loop and complete it and when I await the completion an exception is thrown. So far so good.

When I put a BufferBlock in between it does not work anymore:

bufferBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
    PropagateCompletion = true
});

for (i = 0; i < 10000; i++)
{
    if (!await bufferBlock.SendAsync(i, cts.Token))
    {
        break;
    }
}

bufferBlock.Complete();

await actionBlock.Completion;

The call to SendAsync() just "blocks" forever, because the BufferBlock never transitions to faulted state.

The only solution I found is this:

using (var cts = new CancellationTokenSource())
{
    actionBlock.Completion.ContinueWith(x =>
    {
        if (x.Status != TaskStatus.RanToCompletion)
        {
            cts.Cancel();
        }
    });

    var i = 0;

    try
    {
        for (i = 0; i < 10000; i++)
        {
            if (cts.Token.IsCancellationRequested)
            {
                break;
            }

            if (!await bufferBlock.SendAsync(i, cts.Token))
            {
                break;
            }
        }
    }
    catch (OperationCanceledException)
    {
    }

    bufferBlock.Complete();

    await actionBlock.Completion;
}

Because the state propagates I have to listen to the state of the last block in my network and when this block stops I have to stop my loop.

Is this the intended way to work with Dataflow library or is there a better solution?

SebastianStehle
  • 2,409
  • 1
  • 22
  • 32
  • 2
    Here is relevant question: [TPL Dataflow exception in transform block with bounded capacity](https://stackoverflow.com/questions/21603428/tpl-dataflow-exception-in-transform-block-with-bounded-capacity/). Btw my answer to that question addresses poorly this thorny problem. This GitHub issue shows another dimension of the problem: [No way to cancel completing dataflow blocks](https://github.com/dotnet/runtime/issues/52348). – Theodor Zoulias Sep 15 '21 at 19:32
  • That's not the proper way to handle errors in a pipeline, nor are errors supposed to flow backwards. Blocks aren't function calls and a link doesn't represent any kind of ownership. This isn't a TPL Dataflow issue. The same happens in Go channels. – Panagiotis Kanavos Sep 16 '21 at 08:39

1 Answers1

4

Don't allow unhandled exceptions. An unhandled exception in a block means the block and by extension the entire pipeline is terminally broken and must be aborted. That's not a TPL Dataflow bug, that's how the overall dataflow paradigm works. Exceptions are meant to signal errors up a call stack. There's no call stack in a dataflow though.

Blocks are independent workers that communicate through messages. There's no ownership relation between linked blocks and a faulting block doesn't mean any previous or following blocks should have to abort as well. That's why PropagateCompletion is false by default.

If a source links to more than one blocks the messages can easily go to the other blocks. It's also possible to change the links between blocks at runtime.

In a pipeline there are two different kinds of errors:

  • Message errors that occur when a block/actor/worker processes a message
  • Pipeline errors that invalidate the pipeline and may require aborting

There's no reason to abort the pipeline if a single message faults.

Message errors

If something goes wrong while processing a message, the actor should do something with that message and proceed with the next one. That something may be:

  • Log the error and go on
  • Send an "error" message to another block
  • Use a Result<TMessage,TError> class in the entire pipeline instead of using raw message types, and add any errors to the result

Retry and recovery strategies can be built on top of that, eg forwarding any failed messages to a "retry" block or dead message block

The simplest way would be to just catch the exceptions and log them :

var block = new ActionBlock<int[]>(msg =>
{
    try
    {
        ...
    }
    catch(Exception exc)
    {
        _logger.LogError(exc);
    }
});

Another option is to manually post to e.g. a dead-letter queue :

var dead = new BufferBlock<(int[] data, Exception error)>();

var block = new ActionBlock<int[]>(msg =>
{
    try
    {
        ...
    }
    catch(Exception exc)
    {
        await _dead.SendAsync(msg, exc);
        _logger.LogError(exc);
    }
});

Going even further, one could define a Result<TMessage, TError> class to wrap results. Downstream blocks could ignore faulted results. The LinkTo predicate can also be used to reroute error messages. I'll cheat and hard-code the error to Exception. A better implementation would use different types for success and error :

record Result<TMessage>(TMessage? Message, Exception? Error)
{
    public bool HasError => error != null;
}


var block1 = new TransformBlock<Result<int[]>, Result<double>>(msg =>
{
    if (msg.HasError)
    {
        // Propagate the error message
        return new Result<double>(default, msg.Error);
    }
    try
    {
       var sum = (double)msg.Message.Sum();
       if (sum % 5 == 0)
       {
           throw new Exception("Why not?");
       }
       return new Result(sum, null);
    }
    catch (Exception exc)
    {
        return new Result(null, exc); 
    }
});

var block2 = new ActionBlock<Result<double>>(...);

block1.LinkTo(block2);

Another option is to redirect error messages to a different block:


var errorBlock = new ActionBlock<Result<int[]>>(msg =>
{
    _logger.LogError(msg.Error);
});

block1.LinkTo(errorBlock, msg => msg.HasError);
block1.LinkTo(block2);

This redirects all errored messages to the error block. All other messages move on to block2

Pipeline errors

In some cases, an error is so severe the current block can't recover and perhaps even the entire pipeline must be cancelled/aborted. Cancellation in .NET is handled through a CancellationToken. All blocks accept a CancellationToken to allow aborting.

There's no single abort strategy that's appropriate to all pipelines. Propagating cancellation forward is common but definitely not the only option.

In the simplest case,

var pipeLineCancellation = new CancellationTokenSource();

var block1 = new TransformBlock<Result<int[]>, Result<double>>(msg =>
{
    ...
}, 
new ExecutionDataflowBlockOptions
{
   CancellationToken = pipeLineCancellation.Token
});

The block exception handler could request cancellation in case of a serious error:

    // Wrong table name. We can't use the database
    catch(SqlException exc) when (exc.Number == 208)
    {
       ...
       pipeLineCancellation.Cancel();
    }

This would abort all blocks that use the same CancellationTokenSource. That doesn't mean that all blocks should be connected to the same CancellationTokenSource though.

Flowing cancellation backwards

In Go pipelines, it's common to use an error channel that sends a cancellation message to the previous block. The same can be done in C# using linked CancellationTokenSources. One could even say this is even better than Go.

It's possible to create multiple linked CancellationTokenSources with CreateLinkedTokenSource. By creating sources that link backwards we can have a block signal cancellation for its own source and have the cancellation flow to the root.

var cts5 = new CancellationTokenSource();
var cts4 = CancellationTokenSource.CreateLinkedTokenSource(cts5.Token);
...
var cts1 = CancellationTokenSource.CreateLinkedTokenSource(cts2.Token);

...
var block3 = new TransformBlock<Result<int[]>, Result<double>>(msg =>
{
    ...
    catch(SqlException) 
    {
        cts3.Cancel();
    }
}, 
new ExecutionDataflowBlockOptions
{
   CancellationToken = cts3.Token
});

This will signal cancellation backwards, block by block, without cancelling the downstream blocks.

Pipeline Patterns

Dataflow in .NET is a gem few people know about, so it's really hard to find good references and patterns. The concepts are similar in Go though, so one could use the patterns found in Go Concurrency Patterns: Pipelines and cancellation.

The TPL Dataflow implements the processing loop and completion propagation so one typically only needs to provide the Action or Func that processes messages. The rest of the patterns have to be implemented, although .NET offers some advantages over Go.

  • The done channel is essentially a CancellationTokenSource.
  • Fan-in, fan-out are already handled through existing blocks, or can be handled using a relatively simple custom block that clones messages
  • CancellationTokenSources can be linked explicitly. In Go each "stage" (essentially a block) has to propagate completion/cancellation to other stages
  • One CancellationTokenSource can be used by all stages/blocks.
  • Linking allows not just easier composition but even runtime modifications to the pipeline/mesh.

Let's say we want to just stop processing messages after a while, even though there's no error. All that's needed is to create a CTS used by all blocks:

var pipeLineCancellation = new CancellationTokenSource();


var block1 = new TransformBlock<Result<int[]>, Result<double>>(msg =>
{
    ...
}, 
new ExecutionDataflowBlockOptions
{
   CancellationToken = pipeLineCancellation.Token
});
var block2 = ...;

pipeLineCancellation.Cancel();

Perhaps we want to run the pipeline for only a minute? Easy with

var pipeLineCancellation = new CancellationTokenSource(60000);

There are some disadvantages too, as a Dataflow block has no access to the "channels" or control over the loop

  • In Go it's easy to pass data, error and done channels to each stage, simplifying the error reporting and completion. In .NET the block delegates may have to access other blocks or CTSs directly.
  • In Go it's easier to use common state to eg accumulate data, or manage session/remote connection state. Imagine stage/block that controls a screen scraper like Selenium. We really don't want to restart the browser on every message.

Or we may want to insert data into a database using SqlBulkCopy. With an ActionBlock we'd have to create a new instance for each batch, which may or may not be a problem.

MarredCheese
  • 17,541
  • 8
  • 92
  • 91
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236