Don't allow unhandled exceptions. An unhandled exception in a block means the block and by extension the entire pipeline is terminally broken and must be aborted. That's not a TPL Dataflow bug, that's how the overall dataflow paradigm works. Exceptions are meant to signal errors up a call stack. There's no call stack in a dataflow though.
Blocks are independent workers that communicate through messages. There's no ownership relation between linked blocks and a faulting block doesn't mean any previous or following blocks should have to abort as well. That's why PropagateCompletion
is false
by default.
If a source links to more than one blocks the messages can easily go to the other blocks. It's also possible to change the links between blocks at runtime.
In a pipeline there are two different kinds of errors:
- Message errors that occur when a block/actor/worker processes a message
- Pipeline errors that invalidate the pipeline and may require aborting
There's no reason to abort the pipeline if a single message faults.
Message errors
If something goes wrong while processing a message, the actor should do something with that message and proceed with the next one. That something may be:
- Log the error and go on
- Send an "error" message to another block
- Use a
Result<TMessage,TError>
class in the entire pipeline instead of using raw message types, and add any errors to the result
Retry and recovery strategies can be built on top of that, eg forwarding any failed messages to a "retry" block or dead message block
The simplest way would be to just catch the exceptions and log them :
var block = new ActionBlock<int[]>(msg =>
{
try
{
...
}
catch(Exception exc)
{
_logger.LogError(exc);
}
});
Another option is to manually post to e.g. a dead-letter queue :
var dead = new BufferBlock<(int[] data, Exception error)>();
var block = new ActionBlock<int[]>(msg =>
{
try
{
...
}
catch(Exception exc)
{
await _dead.SendAsync(msg, exc);
_logger.LogError(exc);
}
});
Going even further, one could define a Result<TMessage, TError>
class to wrap results. Downstream blocks could ignore faulted results. The LinkTo
predicate can also be used to reroute error messages. I'll cheat and hard-code the error to Exception
. A better implementation would use different types for success and error :
record Result<TMessage>(TMessage? Message, Exception? Error)
{
public bool HasError => error != null;
}
var block1 = new TransformBlock<Result<int[]>, Result<double>>(msg =>
{
if (msg.HasError)
{
// Propagate the error message
return new Result<double>(default, msg.Error);
}
try
{
var sum = (double)msg.Message.Sum();
if (sum % 5 == 0)
{
throw new Exception("Why not?");
}
return new Result(sum, null);
}
catch (Exception exc)
{
return new Result(null, exc);
}
});
var block2 = new ActionBlock<Result<double>>(...);
block1.LinkTo(block2);
Another option is to redirect error messages to a different block:
var errorBlock = new ActionBlock<Result<int[]>>(msg =>
{
_logger.LogError(msg.Error);
});
block1.LinkTo(errorBlock, msg => msg.HasError);
block1.LinkTo(block2);
This redirects all errored messages to the error block. All other messages move on to block2
Pipeline errors
In some cases, an error is so severe the current block can't recover and perhaps even the entire pipeline must be cancelled/aborted. Cancellation in .NET is handled through a CancellationToken. All blocks accept a CancellationToken to allow aborting.
There's no single abort strategy that's appropriate to all pipelines. Propagating cancellation forward is common but definitely not the only option.
In the simplest case,
var pipeLineCancellation = new CancellationTokenSource();
var block1 = new TransformBlock<Result<int[]>, Result<double>>(msg =>
{
...
},
new ExecutionDataflowBlockOptions
{
CancellationToken = pipeLineCancellation.Token
});
The block exception handler could request cancellation in case of a serious error:
// Wrong table name. We can't use the database
catch(SqlException exc) when (exc.Number == 208)
{
...
pipeLineCancellation.Cancel();
}
This would abort all blocks that use the same CancellationTokenSource. That doesn't mean that all blocks should be connected to the same CancellationTokenSource though.
Flowing cancellation backwards
In Go pipelines, it's common to use an error
channel that sends a cancellation message to the previous block. The same can be done in C# using linked CancellationTokenSource
s. One could even say this is even better than Go.
It's possible to create multiple linked CancellationTokenSources with CreateLinkedTokenSource. By creating sources that link backwards we can have a block signal cancellation for its own source and have the cancellation flow to the root.
var cts5 = new CancellationTokenSource();
var cts4 = CancellationTokenSource.CreateLinkedTokenSource(cts5.Token);
...
var cts1 = CancellationTokenSource.CreateLinkedTokenSource(cts2.Token);
...
var block3 = new TransformBlock<Result<int[]>, Result<double>>(msg =>
{
...
catch(SqlException)
{
cts3.Cancel();
}
},
new ExecutionDataflowBlockOptions
{
CancellationToken = cts3.Token
});
This will signal cancellation backwards, block by block, without cancelling the downstream blocks.
Pipeline Patterns
Dataflow in .NET is a gem few people know about, so it's really hard to find good references and patterns. The concepts are similar in Go though, so one could use the patterns found in Go Concurrency Patterns: Pipelines and cancellation.
The TPL Dataflow implements the processing loop and completion propagation so one typically only needs to provide the Action
or Func
that processes messages. The rest of the patterns have to be implemented, although .NET offers some advantages over Go.
- The
done
channel is essentially a CancellationTokenSource.
- Fan-in, fan-out are already handled through existing blocks, or can be handled using a relatively simple custom block that clones messages
CancellationTokenSource
s can be linked explicitly. In Go each "stage" (essentially a block) has to propagate completion/cancellation to other stages
- One CancellationTokenSource can be used by all stages/blocks.
- Linking allows not just easier composition but even runtime modifications to the pipeline/mesh.
Let's say we want to just stop processing messages after a while, even though there's no error. All that's needed is to create a CTS used by all blocks:
var pipeLineCancellation = new CancellationTokenSource();
var block1 = new TransformBlock<Result<int[]>, Result<double>>(msg =>
{
...
},
new ExecutionDataflowBlockOptions
{
CancellationToken = pipeLineCancellation.Token
});
var block2 = ...;
pipeLineCancellation.Cancel();
Perhaps we want to run the pipeline for only a minute? Easy with
var pipeLineCancellation = new CancellationTokenSource(60000);
There are some disadvantages too, as a Dataflow block has no access to the "channels" or control over the loop
- In Go it's easy to pass
data
, error
and done
channels to each stage, simplifying the error reporting and completion. In .NET the block delegates may have to access other blocks or CTSs directly.
- In Go it's easier to use common state to eg accumulate data, or manage session/remote connection state. Imagine stage/block that controls a screen scraper like Selenium. We really don't want to restart the browser on every message.
Or we may want to insert data into a database using SqlBulkCopy. With an ActionBlock we'd have to create a new instance for each batch, which may or may not be a problem.