1

I came across TPL ActionBlock which seems to be pretty handy for (throttled) parallel async operations. So far I am using Task.WhenAll() (+ Semaphore for throttling). When it comes to exceptions there seem to be big differences though:

var successList = new List<int>();
var failedList = new List<int>();
try
{
    var actionBlock = new ActionBlock<int>(
        async x => await Task.Run(() =>
        {
            if (x < 5)
            {
                failedList.Add(x);
                throw new Exception(x.ToString());
            }

            successList.Add(x);
        }),
        new ExecutionDataflowBlockOptions());

    Enumerable.Range(1, 10).Each(x => actionBlock.Post(x));
    actionBlock.Complete();

    await actionBlock.Completion.ConfigureAwait(false);
}
catch (Exception ex)
{
    // works for approach using task.whenall
    Console.WriteLine(ex);
    Assert.True(failedList.Count == 4);
    Assert.True(successList.Count == 6);
    return;
}

Assert.Fail();

This test fails because ActionBlock stops immediately when exceptions occur. I found this being an issue on github: Dataflow: Add options for Task Faulting. Apparently this behavior is not configurable.

Task.WhenAll() in combination with an extension method like this:

public static async Task PreserveAllExceptions(this Task task)
{
    try
    {
        await task.ConfigureAwait(false);
    }
    catch
    {
        throw task.Exception;
    }
}

Wraps all(!) exceptions in an AggregateException but continues processing:

  await Task.WhenAll(task1,task2).PreserveAllExceptions().ConfigureAwait(false);

Is there a handy way to achieve this using ActionBlock?

UPDATE: To clarify:

  1. I am not planning to use sempahore for throtteling (Why would I?) since there is already such an option in ExecutionDataflowBlockOptions
  2. Code snippet is just a dummy to demonstrate the "problem"; Task.Run() is only used as placeholder for an actual async functon.
  3. What I really want to do is the following: Process all messages in a parallel fashion. Do not cancel further message processing on error. After processing all messages, return and indicate that at least on error occured and return all errors -> Exactly how my Task.WhenAll() with AggregateException is working. I know I simply could try{}catch{} within my ActionBlock and somehow store the exceptions but I was wondering if there would be any configuration possiblity to make this easier. Anyhow, it's not that big of a deal to just use a try catch and collect exceptions everywhere where I make use of ActionBlock. I just find that the Task.WhenAll()+PreserveException Extension is cleaner for my purpose.
yBother
  • 648
  • 6
  • 25
  • As a side note, [this](https://github.com/dotnet/runtime/issues/20882) is not an issue, it is a feature request. What is requested is a glorified `On Error Resume Next`. The reason for the request is, I assume, to relieve the developer from feeling guilty for doing the error-swallowing themself. – Theodor Zoulias Oct 18 '20 at 09:58
  • Yes I am aware of that. – yBother Oct 19 '20 at 06:49
  • It's not a feature request, it's not a glorified On Error Resume Next, it's how Dataflow was designer to work from the start. Dataflow networks are *not* supposed to just die in case of exception, simply because they are *not* used for one call at a time. They're used to serve an infinite stream of messages. Error handling in that case is very different from a desktop application - but not unlike a *web* application, which doesn't crash if a single request throws – Panagiotis Kanavos Oct 19 '20 at 07:31
  • @TheodorZoulias that's not how Dataflow networks work. Or ReactiveX pipelines. Or Go channel pipelines. Or Powershell pipelines. You're confusing one computing paradigm with another. This isn't sequential or synchronous programming. It's functional, reactive, queueing, message passing. Would you expect Azure Queues or Azure Functions in an entire account to crash is a single request failed? – Panagiotis Kanavos Oct 19 '20 at 07:34
  • @yBother there are two problems 1) using Semaphore for throttling, when the ActionBlock already offers bounded inputs and limited DOP 2) allowing exceptions to just escape. Typically, a block should handle any exceptions and eg discard the failed message, redirect it to an error handling block or a retry block. – Panagiotis Kanavos Oct 19 '20 at 07:38
  • @yBother what are you actually trying to do? The way you solve this depends on the actual problem. Your code could be simplified a *lot* by using the correct design - there's no need for semaphores, `Task.Run` or `Task.WhenAll` in the first place. If you only want to log failed messages, you could add a simple `catch` in the `ActionBlock` to log failures. If you want to collect outcomes, you could change the `ActionBlock` to a `TransformBlock` that returns a `Result` object with either a success or failure message, and link it to a BufferBlock – Panagiotis Kanavos Oct 19 '20 at 07:43
  • @yBother the `Result` object could contain additional information from the input message. If you only want the failure messages you could use the `predicate` argument in `LinkTo` to send success objects to a `NullBlock` and failures to the `BufferBlock`. For more complex scenarios you could use blocks that retry the operation a couple of times before dropping the message, or sending it to a dead-letter block – Panagiotis Kanavos Oct 19 '20 at 07:45
  • @PanagiotisKanavos I am referring to the [github thread](https://github.com/dotnet/runtime/issues/20882) linked by the OP. Someone asked for a `ContinueOnTaskFault` option to be added in TPL Dataflow. Their request was rejected. It looks pretty similar to the [`On Error Resume Next`](https://learn.microsoft.com/en-us/office/vba/language/reference/user-interface-help/on-error-statement) VBA statement to me. – Theodor Zoulias Oct 19 '20 at 07:46
  • @TheodorZoulias because it's not needed. – Panagiotis Kanavos Oct 19 '20 at 07:47
  • @yBother what are you trying to do? Your code seems to be using exceptions for control flow. That's wrong no matter the library. What is `x`? – Panagiotis Kanavos Oct 19 '20 at 08:16
  • @PanagiotisKanavos apparently some people (at least one) need it. Otherwise they wouldn't have requested for it. – Theodor Zoulias Oct 20 '20 at 06:33
  • Where do I use semaphore for ActionBlock? I don't! – yBother Oct 20 '20 at 08:27
  • As a side note, the `PreserveAllExceptions` method does not do what it says. it justs creates a deeper (by an one level) hierarchy of `AggregateException`s. If all you want is to observe all exceptions captured by `Task.WhenAll`, you can get them through the `Exception` property of the returned task. You just need to store it into a variable before awaiting it: `var whenAll = Task.WhenAll(task1, task2); await whenAll;` – Theodor Zoulias Oct 20 '20 at 09:12
  • Yes but this is just not as handy. The extension makes this much easier. That's the whole point. I also would not agree that PreserveAllExceptions is a bad name either. – yBother Oct 20 '20 at 11:43
  • Yeap, the `PreserveAllExceptions` is certainly a neat trick, that fixes the practically one and only case where the default behavior of `await` [falls short](https://stackoverflow.com/questions/18314961/i-want-await-to-throw-aggregateexception-not-just-the-first-exception). Personally I would be hesitant to include it in my API as an extension method, because it is essentially an one-trick pony. – Theodor Zoulias Oct 20 '20 at 18:19
  • yBother I just posted a `ForEachAsync` implementation [here](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations/64455549#64455549), that you might find interesting because it allows controlling both the degree of parallelism and the behavior in case of exceptions. Setting the `onErrorContinue` to `true` makes the method behave similar to `Task.WhenAll`. – Theodor Zoulias Oct 21 '20 at 02:16
  • cool thx! interesting approach. it's not a public api but only team-internal.. – yBother Oct 21 '20 at 10:19

2 Answers2

6

It's unclear what the question asks. What's clear though is that the ActionBlock is misused. There's no need for Task.Run since the ActionBlock already uses one or more worker tasks. There's no need for semaphores, since ActionBlock (and the other blocks) already supports throttling by limiting the number of worker tasks and the input queues.

The code seems to be trying to use exceptions as a control flow mechanism too, which would be wrong in procedural code too.

Exceptions aren't meant to escape the block. Dataflow is a completely different computing paradigm from the common procedural paradigm - there are no functions calling each other, so there's no caller to receive and handle the exception.

In a dataflow, blocks pass messages to each other, in a single direction. Blocks are combined in pipelines or networks, receiving messages, processing them and passing them to any connected blocks. If an exception occurs there's no "caller" that could receive that exception. Unhandled exceptions are catastrophic and bring down the entire pipeline - not just a single block, but any downstream block it's linked to with PropagateCompletion set to true. Upstream block will never know about this though, leading to unexpected situations.

Throttling

Throttling with an ActionBlock is easy - for starters, all blocks use just a single worker task. One can throttle upstream callers by limiting their input buffer and using await block.SendAsync() instead of block.Post. There's no need for Task.Run as the block already uses worker tasks :

var options=new ExecutionDataflowBlockOptions 
{ 
    MaxDegreeOfParallelism=2, 
    BoundedCapacity=2
};
var block =new ActionBlock<Message>(processMessage,options);

...
async Task processMessage(Message msg) { ...}

That's enough to allow only two concurrent operations, and stop posters if there are two messages waiting already. If the buffer is at capacity, SendAsync in the following code will wait until a slot becomes available:

foreach(var msg in someInputCollection)
{
    await block.SendAsync(msg);
}

That's it. The block will process 2 messages concurrently (the default is just 1) and accept only 2 messages at a time in its input buffer. If the buffer is at capacity, the posting loop will wait.

Quick & dirty rate limiting can be achieved by adding a delay in the processing method :

var block =new ActionBlock<Message>(msg=>{
    await Task.Delay(200);
    await processMessage(msg);
},options);

Conditional routing

The question's code seems to be using exceptions to implement control flow. That would be wrong in any library or paradigm. Since dataflows work in networks the equivalent of control flow is conditional routing.

This is also available, through the LinkTo overloads that accept a predicate parameter that decides whether a message should be passed down a specific link.

In the question's case, assuming there's an upstream TransformBlock that produces integers, LinkTo can be used to route messages to different BufferBlocks:

var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();

var block=new TransformBlock<Message,int>(...);
//Success if x>=5
block.LinkTo(success,x=>x>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);

That's it. The only "trick" is that the predicates should cover all options, otherwise messages will remain stuck in block's output buffer. It helps to use a default link after all others to ensure no messages are left unhandled.

Error Handling

Blocks shouldn't allow exceptions to escape. There are several error handling strategies that depend on what the application wants to do.

Handle and log

One option is to handle them and log them in place, the same way one would handle errors in a web application:

var block =new ActionBlock(msg=>{ try { await processMessage(msg); } catch(Exception exc) { _logger.LogError(exc,....); } },options);

Post to another block

Another possibility is to post the exception, and possibly information about the incoming message, to another block directly. That block could log the error and messag or retry it after a delay. There may be a different pipeline behind that block to retry messages with increasing delays before sending them to a dead-letter buffer, similar to what one would do with message queues:

var block =new ActionBlock<Message>(msg=>{
    try
    {
        await processMessage(msg);
    }
    catch(SomeRetriableException exc)
    {
        _retryBlock.Post(new RetryMsg(msg,exc));
    }
    catch(Exception exc)
    {
       
       _logger.LogError(exc,....);
    }
},options);

The strategy to use depends on what the application does. If the ActionBlock is used as a simple background worker, it may be enough to just log.

Wrap and route

In a more advanced scenario messages can be wrapped in an Envelope<> that carries the message and possibly any exceptions. Routing can be used to separate success from failure messages :

class Envelope<T>
{
    public T Message{get;}
    public Exception Error {get;}
    
    public Envelope (T msg)
    {
        Message=msg;
    }
    
    public Envelope(T msg,Exception err)
    {
        Message=msg;
        Error=err;
    }
}

The block now returns an Envelope :

var block=new TransformBlock<Envelope<Message>,Envelope<int>>(env=>{
    try
    {
        var msg=env.Message;
        ....
        return new Envelope(6);
    }
    catch(Exception exc)
    {
        return new Envelope(msg,exc);
    }
});

This allows routing errors to an errorBlock using conditional routing :

var errorBlock = ActionBlock<Envelope<Message>>(...);

var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();

//Send errors to `errorBlock`
block.LinkTo(errorBlock,env=>env.Error!=null);

//Success if x>=5
block.LinkTo(success,x=>x.Message>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
0

There is no easy way to aggregate all exceptions and propagate them through the Completion property of the ActionBlock. Unfortunately the TPL Dataflow components are not easily extensible. You could do it if you really wanted to, by encapsulating an ActionBlock inside a custom block and customizing the Completion of this block. For example:

public class MyActionBlock<TInput> : ITargetBlock<TInput>
{
    private readonly ActionBlock<TInput> _actionBlock;
    private readonly ConcurrentQueue<Exception> _exceptions;

    //...

    public Task Completion
    {
        get
        {
            return _actionBlock.Completion.ContinueWith(t =>
            {
                if (_exceptions.Count > 0)
                    throw new AggregateException(_exceptions);
            });
        }
    }
}

...but this simple code will not propagate cancellation, neither will propagate any exceptions passed through the Fault method of the IDataflowBlock interface. So you'll have to spend considerable amount of effort in order to make it work correctly in all cases, and it's doubtful that the investment will pay of.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104