Data processing pipelines and transient fault handling seem to go hand in hand, so I'm interested in seeing if I can get 2 of the best libraries for these - TPL Dataflow and Polly, respectively - to play nicely together.
As a starting point, I'd like to apply a fault handling policy to an ActionBlock
. Ideally I'd like to encapsulate it in a block-creating method with a signature like this:
ITargetBlock<T> CreatePollyBlock<T>(
Action<T> act, ExecutionDataflowBlockOptions opts, Polly.Policy policy)
It would be easy enough to simply policy.Execute
the action from inside an ActionBlock
, but I have these 2 requirements:
- In the case of a retry, I don't want retrying an item to take priority over other queued up items. In other words, when you fail, you go to the back of the line.
- More importantly, if there's a waiting period before the retry, I don't want that to block new items from getting in. And if
ExecutionDataflowBlockOptions.MaxDegreeOfParallelism
is set, I don't want items waiting for retry to "count" against that max.
To satisfy these requirements, I figure I need an "inner" ActionBlock
with the user-provided ExecutionDataflowBlockOptions
applied, and some "outer" block that posts items to the inner block and applies any wait-and-retry logic (or whatever the policy dictates) outside the context of the inner block. Here's my first attempt:
// wrapper that provides a data item with mechanism to await completion
public class WorkItem<T>
{
private readonly TaskCompletionSource<byte> _tcs = new TaskCompletionSource<byte>();
public T Data { get; set; }
public Task Completion => _tcs.Task;
public void SetCompleted() => _tcs.SetResult(0);
public void SetFailed(Exception ex) => _tcs.SetException(ex);
}
ITargetBlock<T> CreatePollyBlock<T>(Action<T> act, Policy policy, ExecutionDataflowBlockOptions opts) {
// create a block that marks WorkItems completed, and allows
// items to fault without faulting the entire block.
var innerBlock = new ActionBlock<WorkItem<T>>(wi => {
try {
act(wi.Data);
wi.SetCompleted();
}
catch (Exception ex) {
wi.SetFailed(ex);
}
}, opts);
return new ActionBlock<T>(async x => {
await policy.ExecuteAsync(async () => {
var workItem = new WorkItem<T> { Data = x };
await innerBlock.SendAsync(workItem);
await workItem.Completion;
});
});
}
To test it, I created a block with a wait-and-retry policy and a dummy method that throws an exception the first 3 times it's called (app-wide). Then I fed it some data:
"a", "b", "c", "d", "e", "f"
I would expect a, b, and c to fail and go the back of the line. But I observed they hit the inner block's action in this order:
"a", "a", "a", "a", "b", "c", "d", "e", "f"
Essentially, I failed to meet my own requirements, and it's fairly easy to see why: the outer block is not letting new items in until all retries of the current item occur. A simple yet seemingly hackish solution is to add a big MaxDegreeOfParallelism
value to the outer block:
return new ActionBlock<T>(async x => {
await policy.ExecuteAsync(async () => {
var workItem = new WorkItem<T> { Data = x };
await innerBlock.SendAsync(workItem);
await workItem.Completion;
});
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
With this change, I observed that new items do in fact get in before the retries, but I've also unleashed some chaos. Entry into the inner block has become random, though the first 3 items in are always at the end:
"a", "e", "b", "c", "d", "a", "e", "b"
So this is a little better. But ideally I'd like to see order preserved:
"a", "b", "c", "d", "e", "a", "b", "c"
This is where I'm stuck, and reasoning about it I'm wondering if it's even possible under these constraints, specifically that the internals of CreatePollyBlock
can execute the policy but cannot define it. If those internals could, for example, provide the retry lambda, I think that would give me a lot more options. But that's part of the policy definition and per this design I can't do that.
Thanks in advance for any help.