Rather than trying to make sense of what faulted or not, the blocks should not allow unhandled exceptions. This is a very common pattern, also used in [Go pipelines]9https://blog.golang.org/pipelines)
The article Exception Handling in TPL Dataflow Networks explains how exceptions are handled.
- When an unhandled exception is thrown, the block enters the faulted state, only after all concurrent operations are finished.
- That state is propagated to linked blocks that have
PropagateCompletion
set to true
. That doesn't mean that downstream blocks will immediately fault though.
Awaiting a faulted block throws. The line :
await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
should have thrown, unless those blocks were still busy.
The solution - don't allow unhandled exceptions
Return a Result
object instead. When having to make eg 1000 HTTP calls, it would be a bad idea to have one exception prevent the other 900 calls anyway. This is broadly similar to Railroad-oriented programming. A Dataflow pipeline is quite similar to a functional pipeline.
Each block should return a Result<T>
class that wraps the actual result and somehow indicates success or failure. An exception handling block should catch any exceptions and return a faulted Result<T>
item. The LinkTo
method can have a predicate that allows redirecting failed results to eg a logging block or a NullBlock.
Let's say we have this simple Result<T>
:
class Result<T>
{
public T Value{get;}
public Exception Exception{get;}
public bool Ok {get;}
public Result(){}
public Result(T value)
{
Value=value;
Ok=true;
}
public Result(Exception exc)
{
Exception=exc;
Ok=false;
}
}
fetchApiResponse
could be :
var fetchApiResponse = new TransformBlock<TA, Result<TA>>((item) => {
try
{
...
return new Result(ObjA,true);
}
catch(Exception exc)
{
return new Result(exc);
}
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
and the LinkTo
code could be :
var propagate=new DataflowLinkOptions { PropagateCompletion = true };
var nullBlock=DataflowBlock.NullTarget<Result<TA>>();
fetchApiResponse.Linkto(logger,propagage,msg=>!msg.Ok);
fetchApiResponse.LinkTo(finalBlock,propagate,msg=>msg.Ok);
In this case, bad message are simply dumped to a null block.
There's no reason to use another buffer block, or await all blocks. Both TransformBlock and ActionBlock have an input buffer controlled by the ExecutionDataflowBlockOptions
options.
Posting the messages and awaiting completion can be:
await FetchData(fetchApiResponse);
fetchApiResponse.Complete();
await finalBlock.Completion;
The null check in finalBlock
can be removed too, if fetchApiResponse
return an empty Result
object if there's no valid result.
More complex scenarios can be handled by more complex Result
objects.
Abrupt termination
Even when the pipeline needs to terminate immediately, there shouldn't be any unhandled exceptions. A fault may propagate downstream but won't affect the upstream blocks. They'll keep their messages in memory and keep accepting input even though the rest of the pipeline is broken.
That can definitely look like a deadlock.
The solution to this is to use a CancellationTokenSource
, pass its token to all blocks, and signal it if the pipeline needs to be terminated.
This is common practice eg in Go, to use a channel like a CancellationTokenSource for precisely this reason, and cancel both downstream and upstream blocks. This is described in Go Concurrency Patterns: Pipelines and cancellation
Early cancellation is useful if a block decides there's no reason to continue working, not just in case of error. In this case it can singal the CancellationTokenSource to stop upstream blocks