5

Data processing pipelines and transient fault handling seem to go hand in hand, so I'm interested in seeing if I can get 2 of the best libraries for these - TPL Dataflow and Polly, respectively - to play nicely together.

As a starting point, I'd like to apply a fault handling policy to an ActionBlock. Ideally I'd like to encapsulate it in a block-creating method with a signature like this:

ITargetBlock<T> CreatePollyBlock<T>(
    Action<T> act, ExecutionDataflowBlockOptions opts, Polly.Policy policy)

It would be easy enough to simply policy.Execute the action from inside an ActionBlock, but I have these 2 requirements:

  1. In the case of a retry, I don't want retrying an item to take priority over other queued up items. In other words, when you fail, you go to the back of the line.
  2. More importantly, if there's a waiting period before the retry, I don't want that to block new items from getting in. And if ExecutionDataflowBlockOptions.MaxDegreeOfParallelism is set, I don't want items waiting for retry to "count" against that max.

To satisfy these requirements, I figure I need an "inner" ActionBlock with the user-provided ExecutionDataflowBlockOptions applied, and some "outer" block that posts items to the inner block and applies any wait-and-retry logic (or whatever the policy dictates) outside the context of the inner block. Here's my first attempt:

// wrapper that provides a data item with mechanism to await completion
public class WorkItem<T>
{
    private readonly TaskCompletionSource<byte> _tcs = new TaskCompletionSource<byte>();

    public T Data { get; set; }
    public Task Completion => _tcs.Task;

    public void SetCompleted() => _tcs.SetResult(0);
    public void SetFailed(Exception ex) => _tcs.SetException(ex);
}

ITargetBlock<T> CreatePollyBlock<T>(Action<T> act, Policy policy, ExecutionDataflowBlockOptions opts) {
    // create a block that marks WorkItems completed, and allows
    // items to fault without faulting the entire block.
    var innerBlock = new ActionBlock<WorkItem<T>>(wi => {
        try {
            act(wi.Data);
            wi.SetCompleted();
        }
        catch (Exception ex) {
            wi.SetFailed(ex);
        }
    }, opts);

    return new ActionBlock<T>(async x => {
        await policy.ExecuteAsync(async () => {
            var workItem = new WorkItem<T> { Data = x };
            await innerBlock.SendAsync(workItem);
            await workItem.Completion;
        });
    });
}

To test it, I created a block with a wait-and-retry policy and a dummy method that throws an exception the first 3 times it's called (app-wide). Then I fed it some data:

"a", "b", "c", "d", "e", "f"

I would expect a, b, and c to fail and go the back of the line. But I observed they hit the inner block's action in this order:

"a", "a", "a", "a", "b", "c", "d", "e", "f"

Essentially, I failed to meet my own requirements, and it's fairly easy to see why: the outer block is not letting new items in until all retries of the current item occur. A simple yet seemingly hackish solution is to add a big MaxDegreeOfParallelism value to the outer block:

return new ActionBlock<T>(async x => {
    await policy.ExecuteAsync(async () => {
        var workItem = new WorkItem<T> { Data = x };
        await innerBlock.SendAsync(workItem);
        await workItem.Completion;
    });
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });

With this change, I observed that new items do in fact get in before the retries, but I've also unleashed some chaos. Entry into the inner block has become random, though the first 3 items in are always at the end:

"a", "e", "b", "c", "d", "a", "e", "b"

So this is a little better. But ideally I'd like to see order preserved:

"a", "b", "c", "d", "e", "a", "b", "c"

This is where I'm stuck, and reasoning about it I'm wondering if it's even possible under these constraints, specifically that the internals of CreatePollyBlock can execute the policy but cannot define it. If those internals could, for example, provide the retry lambda, I think that would give me a lot more options. But that's part of the policy definition and per this design I can't do that.

Thanks in advance for any help.

Todd Menier
  • 37,557
  • 17
  • 150
  • 173
  • 1
    I'm wondering if you could use the `DataflowBlockOptions.TaskScheduler` to specify the `QueuedTaskScheduler` from ParallelExtensionsExtras as the scheduler for the inner blocks. Then, use the priority queue feature of `QueuedTaskScheduler` to create priority queues according the try number. Place the first attempt in a queue `priority:0`, the first retry in a queue `priority:1`, the 2nd retry in `priority:2` etc. That would place all first retries after initial tries, all 2nd retries after 1st retries etc. It's a fairly elaborate solution, and I'd be interested to hear of something simpler. – mountain traveller Sep 06 '18 at 19:48
  • Whatever the solution to preserving the order of processing of blocks at the initial attempt/prio, if time to failure of an operation is random, is there any easy way with this design to preserve order at _retries_? If "a" fails slowly but "e" fails fast, the retry of "e" is likely to precede the retry of "a" – mountain traveller Sep 06 '18 at 20:21
  • @mountaintraveller Thanks (and great library). Good point about order of retries - I do want them to be re-queued immediately upon failure, so retries won't necessarily match the order they were first attempted. I do think it's reasonable to expect the initial attempts to happen in the order they're posted though. I'll read up on QTS, but I can't help but think there's a solution here that just involves composing blocks a certain way. – Todd Menier Sep 06 '18 at 20:34
  • @ToddMenier Dataflows are similar to queues and message processing. Retrying means reposting a message to the input queue. Retries and other routing data are kept in the message's envelope. Dataflow blocks *already* have input/output queues. Instead of sending raw messages, use an `Envelope` class that contains status, retry fields. `Link` can have a predicate that can be used to direct messages to the next block, a discard block or the original block for retrying. – Panagiotis Kanavos Sep 07 '18 at 08:43
  • @ToddMenier or you can use Polly and have the recovery action repost to the original block after decrementing the retry counter. In any case, *don't* allow exceptions to propagate, encapsulate them into failure envelopes. – Panagiotis Kanavos Sep 07 '18 at 08:44
  • @ToddMenier [the answers to this question](https://stackoverflow.com/questions/17469689/retry-policy-within-itargetblocktinput) describe both options. The retry logic in svick's answer can be replaced by Polly as long as the recovery function posts to the original block. Stephen Cleary explains the second option, linking back to the original block – Panagiotis Kanavos Sep 07 '18 at 08:46
  • @PanagiotisKanavos Maybe I need more comments in my code, because it's basically doing everything you describe - using an envelope (I call it `WorkItem`), not allowing exceptions to propagate (captured in `WorkItem.Completion` task), and reposting on failure. This attempted solution was inspired, at least partially, by the 2 answers to the question you referenced, so I think we're on the same page. :) – Todd Menier Sep 07 '18 at 12:53
  • @ToddMenier you need to *remove* code. The WorkItem doesn't need anything other than the Data and a retry count so the *next* time the workitem is processed the worker function can check whether it should retry or fail.Dataflow processing is similar to message processing. It's not the *message's* job to handle routing or retrying. – Panagiotis Kanavos Sep 07 '18 at 13:05
  • @ToddMenier the policy should be called *inside* the block too. *It* should catch the exception and repost to the *original* block. You don't need two blocks. If you check svick's code he defines the block variable first, then instantiates the block, which allows him to refer to the block from inside the worker delegate – Panagiotis Kanavos Sep 07 '18 at 13:08
  • @PanagiotisKanavos Thanks for your help. It sounds like you think I need to go back to the drawing board here, and I would very much welcome an answer if you think you can solve this. Just keep in mind my constraints: the `CreatePollyBlock` method signature I'm trying to implement (specifically that a fully defined Policy is passed in from the outside world), and the 2 points about letting new items in ahead of retries and waiting items not "counting" against the block's max parallelism. Those constraints are why I introduced 2 blocks, and they don't apply to svick's answer. – Todd Menier Sep 07 '18 at 13:32
  • 1
    Thanks @PanagiotisKanavos for the very intg refs to svick's and StephenCleary's answers. If @ToddMenier is utilising limited parallelism in the processing, it could be necessary to use two separate blocks, as with a single block only, operating under constrained parallelism, there could be a risk that some of the parallelism becomes occupied by the 'wait' phase of WaitAndRetry, which is wasteful of the parallelism. Internally, Polly's WaitAndRetryAsync policy uses `Task.Delay(...)` for the wait: it depends if `Task.Delay(...)` effectively releases a parallelism slot from TPL's perspective. – mountain traveller Sep 07 '18 at 16:28
  • 1
    @ToddMenier Re "ideally see the order preserved", feels like perhaps some conceptual tension between goals. Parallel proc'g by defn opens up the poss for varying completion order by race conditions? Might be poss tho to structure the code so that output order is faithful 'enough' to input order for some given production envt. In partic case, feels like maybe races enqueuing (outer on inner) are in play, and relative durns of that versus `act` vs temporal separation of inputs could all poss influenc output in real world. Impt also to know if test bed is representative of prod in these respects. – mountain traveller Sep 07 '18 at 16:52
  • @mountaintraveller You make a good point. On one hand, if the max parallelism is 1 (the default), the expectation is that items will be processed sequentially, in the order provided. But on the other hand I'm saying retries go the the back of the line, which by definition means items could complete in any order. It still _feels_ more correct to send the first attempts to the inner block in order, but in practical terms maybe what I have here is good enough. – Todd Menier Sep 07 '18 at 19:18
  • 1
    I _could_ enhance my method signature with a `mustPreserveOrder` bit. If false, go with my current implementation. If true, it's a lot simpler - item B _can't_ start until item A has done all of its retries and waiting, so the whole thing could just be encapsulated in 1 block. false = loss of order guarantee in exchange for _much_ better throughput. – Todd Menier Sep 07 '18 at 19:23

0 Answers0