It's unclear what the question asks. What's clear though is that the ActionBlock is misused. There's no need for Task.Run
since the ActionBlock already uses one or more worker tasks. There's no need for semaphores, since ActionBlock (and the other blocks) already supports throttling by limiting the number of worker tasks and the input queues.
The code seems to be trying to use exceptions as a control flow mechanism too, which would be wrong in procedural code too.
Exceptions aren't meant to escape the block. Dataflow is a completely different computing paradigm from the common procedural paradigm - there are no functions calling each other, so there's no caller to receive and handle the exception.
In a dataflow, blocks pass messages to each other, in a single direction. Blocks are combined in pipelines or networks, receiving messages, processing them and passing them to any connected blocks. If an exception occurs there's no "caller" that could receive that exception. Unhandled exceptions are catastrophic and bring down the entire pipeline - not just a single block, but any downstream block it's linked to with PropagateCompletion
set to true. Upstream block will never know about this though, leading to unexpected situations.
Throttling
Throttling with an ActionBlock is easy - for starters, all blocks use just a single worker task. One can throttle upstream callers by limiting their input buffer and using await block.SendAsync()
instead of block.Post
. There's no need for Task.Run
as the block already uses worker tasks :
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism=2,
BoundedCapacity=2
};
var block =new ActionBlock<Message>(processMessage,options);
...
async Task processMessage(Message msg) { ...}
That's enough to allow only two concurrent operations, and stop posters if there are two messages waiting already. If the buffer is at capacity, SendAsync
in the following code will wait until a slot becomes available:
foreach(var msg in someInputCollection)
{
await block.SendAsync(msg);
}
That's it. The block will process 2 messages concurrently (the default is just 1) and accept only 2 messages at a time in its input buffer. If the buffer is at capacity, the posting loop will wait.
Quick & dirty rate limiting can be achieved by adding a delay in the processing method :
var block =new ActionBlock<Message>(msg=>{
await Task.Delay(200);
await processMessage(msg);
},options);
Conditional routing
The question's code seems to be using exceptions to implement control flow. That would be wrong in any library or paradigm. Since dataflows work in networks the equivalent of control flow is conditional routing.
This is also available, through the LinkTo overloads that accept a predicate
parameter that decides whether a message should be passed down a specific link.
In the question's case, assuming there's an upstream TransformBlock
that produces integers, LinkTo
can be used to route messages to different BufferBlocks:
var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();
var block=new TransformBlock<Message,int>(...);
//Success if x>=5
block.LinkTo(success,x=>x>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);
That's it. The only "trick" is that the predicates should cover all options, otherwise messages will remain stuck in block
's output buffer. It helps to use a default
link after all others to ensure no messages are left unhandled.
Error Handling
Blocks shouldn't allow exceptions to escape. There are several error handling strategies that depend on what the application wants to do.
Handle and log
One option is to handle them and log them in place, the same way one would handle errors in a web application:
var block =new ActionBlock(msg=>{
try
{
await processMessage(msg);
}
catch(Exception exc)
{
_logger.LogError(exc,....);
}
},options);
Post to another block
Another possibility is to post the exception, and possibly information about the incoming message, to another block directly. That block could log the error and messag or retry it after a delay. There may be a different pipeline behind that block to retry messages with increasing delays before sending them to a dead-letter buffer, similar to what one would do with message queues:
var block =new ActionBlock<Message>(msg=>{
try
{
await processMessage(msg);
}
catch(SomeRetriableException exc)
{
_retryBlock.Post(new RetryMsg(msg,exc));
}
catch(Exception exc)
{
_logger.LogError(exc,....);
}
},options);
The strategy to use depends on what the application does. If the ActionBlock is used as a simple background worker, it may be enough to just log.
Wrap and route
In a more advanced scenario messages can be wrapped in an Envelope<>
that carries the message and possibly any exceptions. Routing can be used to separate success from failure messages :
class Envelope<T>
{
public T Message{get;}
public Exception Error {get;}
public Envelope (T msg)
{
Message=msg;
}
public Envelope(T msg,Exception err)
{
Message=msg;
Error=err;
}
}
The block now returns an Envelope :
var block=new TransformBlock<Envelope<Message>,Envelope<int>>(env=>{
try
{
var msg=env.Message;
....
return new Envelope(6);
}
catch(Exception exc)
{
return new Envelope(msg,exc);
}
});
This allows routing errors to an errorBlock
using conditional routing :
var errorBlock = ActionBlock<Envelope<Message>>(...);
var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();
//Send errors to `errorBlock`
block.LinkTo(errorBlock,env=>env.Error!=null);
//Success if x>=5
block.LinkTo(success,x=>x.Message>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);