3

I am using TPL pipeline design together with Stephen Cleary's Try library In short it wraps value/exception and floats it down the pipeline. So even items that have thrown exceptions inside their processing methods, at the end when I await resultsBlock.Completion; have Status=RunToCompletion. So I need other way how to register faulted items. Here is small sample:

var downloadBlock = new TransformBlock<int, Try<int>>(construct => Try.Create(() =>
{
    //SomeProcessingMethod();
    return 1;
}));
var processBlock = new TransformBlock<Try<int>, Try<int>>(construct => construct.Map(value =>
{
    //SomeProcessingMethod();
    return 1;
}));
var resultsBlock = new ActionBlock<Try<int>>(construct =>
{
    if (construct.IsException)
    {
        var exception = construct.Exception;
        switch (exception)
        {
            case GoogleApiException gex:
                //_notificationService.NotifyUser("OMG, my dear sir, I think I messed something up:/"
                //Register that this item was faulted, so we know that we need to retry it.
                break;
            default:
                break;
        }
    }
});

One solution would be to create a List<int> FaultedItems; where I would insert all faulted items in my Exception handling block and then after await resultsBlock.Completion; I could check if the list is not empty and create new pipeline for faulted items. My question is if I use a List<int> am I at risk of running into problems with thread safety if I decide to play with MaxDegreeOfParallelism settings and I'd be better off using some ConcurrentCollection? Or maybe this approach is flawed in some other way?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
niks
  • 579
  • 1
  • 3
  • 14
  • You misunderstood Stephen Cleary's solution. The *data* contains faults, not the blocks themselves. The pipeline still works, so in the end, the blocks should complete successfully. If you don't want to use `Result` objects (which makes your code a *lot* simpler) you can redirect fault messages to BufferBlocks (not Lists), or even ActionBlocks that write to a logger – Panagiotis Kanavos Jun 02 '20 at 15:06
  • 1
    `LinkTo` accepts a predicate that can be used to direct messages either to the next step in the pipeline or some other block like a BufferBlock, a logging block or even a NullBlock. Be careful though, because messages that aren't matched by any predicate will stay in their block's output buffer essentially blocking the pipeline. – Panagiotis Kanavos Jun 02 '20 at 15:08
  • @Panagiotis Thank you for answer! I am totally confused right now. I do understand that the data that is flowing down the pipe contains the faults and I would gladly put it to use to make my code as simple as possible. So you mean that from my `resultsBlock` I should redirect messages to some BufferBlock if they contain fault? I am sorry if this sounds dumb to you. This stuff is something new to me. – niks Jun 02 '20 at 15:36
  • Take a look at this: [Retry policy within ITargetBlock](https://stackoverflow.com/questions/17469689/retry-policy-within-itargetblocktinput). Implementing a dataflow block with retry functionality is quite tricky, because some must-have options generate inherent difficulties (`EnsureOrdered`, `BoundedCapacity`), and is also not obvious how to enforce a specific delay between repeated attempts for the same item. It is doable though. – Theodor Zoulias Jun 02 '20 at 15:53
  • @Theodor Thank you for answering. I will examine link you have provided. Seems quite a lot to grasp. – niks Jun 02 '20 at 16:18

1 Answers1

3

I converted a retry-block implementation from an answer to a similar question, to work with Stephen Cleary's Try types as input and output. The method CreateRetryTransformBlock returns a TransformBlock<Try<TInput>, Try<TOutput>>, and the method CreateRetryActionBlock returns something that is practically an ActionBlock<Try<TInput>>.

Three more options are available, the MaxAttemptsPerItem, MinimumRetryDelay and MaxRetriesTotal, on top of the standard execution options.

public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
    /// <summary>The limit after which an item is returned as failed.</summary>
    public int MaxAttemptsPerItem { get; set; } = 1;
    /// <summary>The minimum delay duration before retrying an item.</summary>
    public TimeSpan MinimumRetryDelay { get; set; } = TimeSpan.Zero;
    /// <summary>The limit after which the block transitions to a faulted
    /// state (unlimited is the default).</summary>
    public int MaxRetriesTotal { get; set; } = -1;
}

public class RetryLimitException : Exception
{
    public RetryLimitException(string message, Exception innerException)
        : base(message, innerException) { }
}

public static TransformBlock<Try<TInput>, Try<TOutput>>
    CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));
    int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
    int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
    TimeSpan retryDelay = dataflowBlockOptions.MinimumRetryDelay;
    if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxAttemptsPerItem));
    if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxRetriesTotal));
    if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MinimumRetryDelay));

    var internalCTS = CancellationTokenSource
        .CreateLinkedTokenSource(dataflowBlockOptions.CancellationToken);

    var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
    var taskScheduler = dataflowBlockOptions.TaskScheduler;

    var exceptionsCount = 0;
    SemaphoreSlim semaphore;
    if (maxDOP == DataflowBlockOptions.Unbounded)
    {
        semaphore = new SemaphoreSlim(Int32.MaxValue);
    }
    else
    {
        semaphore = new SemaphoreSlim(maxDOP, maxDOP);

        // The degree of parallelism is controlled by the semaphore
        dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

        // Use a limited-concurrency scheduler for preserving the processing order
        dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
            taskScheduler, maxDOP).ConcurrentScheduler;
    }

    var block = new TransformBlock<Try<TInput>, Try<TOutput>>(async item =>
    {
        // Continue on captured context after every await
        if (item.IsException) return Try<TOutput>.FromException(item.Exception);
        var result1 = await ProcessOnceAsync(item);
        if (item.IsException || result1.IsValue) return result1;
        for (int i = 2; i <= maxAttemptsPerItem; i++)
        {
            await Task.Delay(retryDelay, internalCTS.Token);
            var result = await ProcessOnceAsync(item);
            if (result.IsValue) return result;
        }
        return result1; // Return the first-attempt exception
    }, dataflowBlockOptions);

    dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
    dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value

    _ = block.Completion.ContinueWith(_ => internalCTS.Dispose(),
        TaskScheduler.Default);

    return block;

    async Task<Try<TOutput>> ProcessOnceAsync(Try<TInput> item)
    {
        await semaphore.WaitAsync(internalCTS.Token);
        try
        {
            var result = await item.Map(transform);
            if (item.IsValue && result.IsException)
            {
                ObserveNewException(result.Exception);
            }
            return result;
        }
        finally
        {
            semaphore.Release();
        }
    }

    void ObserveNewException(Exception ex)
    {
        if (maxRetriesTotal == -1) return;
        uint newCount = (uint)Interlocked.Increment(ref exceptionsCount);
        if (newCount <= (uint)maxRetriesTotal) return;
        if (newCount == (uint)maxRetriesTotal + 1)
        {
            internalCTS.Cancel(); // The block has failed
            throw new RetryLimitException($"The max retry limit " +
                $"({maxRetriesTotal}) has been reached.", ex);
        }
        throw new OperationCanceledException();
    }
}

public static ITargetBlock<Try<TInput>> CreateRetryActionBlock<TInput>(
    Func<TInput, Task> action,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateRetryTransformBlock<TInput, object>(async input =>
    {
        await action(input).ConfigureAwait(false); return null;
    }, dataflowBlockOptions);
    var nullTarget = DataflowBlock.NullTarget<Try<object>>();
    block.LinkTo(nullTarget);
    return block;
}

Usage example:

var downloadBlock = CreateRetryTransformBlock(async (int construct) =>
{
    int result = await DownloadAsync(construct);
    return result;
}, new RetryExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10,
    MaxAttemptsPerItem = 3,
    MaxRetriesTotal = 100,
    MinimumRetryDelay = TimeSpan.FromSeconds(10)
});

var processBlock = new TransformBlock<Try<int>, Try<int>>(
    construct => construct.Map(async value =>
{
    return await ProcessAsync(value);
}));

downloadBlock.LinkTo(processBlock,
    new DataflowLinkOptions() { PropagateCompletion = true });

To keep things simple, in case that an item has been retried the maximum number of times, the exception preserved is the first one that occurred. The subsequent exceptions are lost. In most cases the lost exceptions are going to be of the same type as the first one anyway.

Caution: The above implementation does not have an efficient input queue. If you feed this block with millions of items, the memory usage will explode.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Hi Theodor! Thank you for answer. I will most definitely get back to you with some questions when I dig into it! – niks Jun 03 '20 at 08:47
  • @niks I updated my answer with a somewhat simpler implementation. The idea is to use a single `SemaphoreSlim` for controlling the concurrency for both the initial attempts and the retries, instead of using internally two linked blocks. – Theodor Zoulias Jun 03 '20 at 15:19
  • Hmm, is this approach in any way inferior to the previous one? – niks Jun 04 '20 at 06:41
  • @niks I think that both approaches are roughly equivalent. Both are also a bit experimental. I am quite confident for their robustness and efficiency for normal usage though. – Theodor Zoulias Jun 04 '20 at 07:00
  • I find this code pretty hard to debug(my lack of understanding) and maybe this is false asumption, but can it be that even if I set `MaxDegreeOfParallelism =1` if one item is faulted and it has entered its retry loop, other items are being processed simultaneously while the faulted one has not completed its retry cycle yet? – niks Jun 04 '20 at 08:06
  • @niks yeap, with `MaxDegreeOfParallelism = 1` only one item will be actively processed at any moment. If an item fails, it will wait for the `RetryDelay` period and then will ask for the semaphore again. It may have to wait for a long time before acquiring it, because of all other items that may have asked for it, either for their first attempt or for a retry. The `RetryDelay` is just the minimum duration between retries. The average delay may be much longer. The `SemaphoreSlim` class maintains internally a FIFO queue with its awaiters, so the processing of all items will happen with fairness. – Theodor Zoulias Jun 04 '20 at 08:49
  • So, just to make clear, if I push into pipeline `var data = new List(){a,b,c,d}` and if "a" fails it could happen that while I am awaiting `retryDelay` `SemaphoreSlim` allows other entry, lets say, "b" to proceed and `SemaphoreSlim` puts first item "a" at the end of the line? – niks Jun 04 '20 at 10:24
  • @niks yes, the "a" will be retried only after "b", "c" and "d" have all been attempted once. By default though, since the `EnsureOrdered` option is `true` by default, the "a" item will be the first to be exported by the block, even if all other letters have been successfully processed earlier. – Theodor Zoulias Jun 04 '20 at 10:51
  • I would like to test this behavior but setting `new RetryExecutionDataflowBlockOptions(){MaxAttemptsPerItem = 3, RetryDelay=TimeSpan.FromSeconds(10), EnsureOrdered = false });` does not let other items get in front of the failed one. At the end it still comes out a,b,c,d. – niks Jun 04 '20 at 11:07
  • @niks I can't reproduce it. I have linked the "Retry" block to an `ActionBlock` that writes to the Console the letter it receives, and it writes the letters in the order b, c, d, a. Are you sure that processing the "a" results to an exception? – Theodor Zoulias Jun 04 '20 at 11:17
  • 1
    I am sorry, it was my mistake. Yes, it works now. Thank you for help, I will now proceed with further testing! – niks Jun 04 '20 at 12:15
  • @niks I noticed a possible drawback of the above implementation. It prioritizes processing new items instead of retrying failed items, resulting to all failed items retried at the end. This can be a bit inefficient in case that the failed items have higher probability to fail again, because of the delay imposed between retries. So the block may end up being less productive at its final stage (too much delay, too little work). This could be easily fixed if the .NET had some semaphore class supporting priorities, but unfortunately [it hasn't](https://stackoverflow.com/questions/39474370/). – Theodor Zoulias Jun 07 '20 at 09:27
  • Thank you for informing about this. I have spent 2 days trying to figure out how to elegantly return value tuple `(TId, Try` instead of `Try<(TId,TOutput)>` Let's say I use it like this `CreateRetryTransformBlock(...)` it would return `Try<(string, int)>` but how to achieve so that only `int` value is wrapped inside `Try`? You have already dedicated tremendous amount of time on this answer, so I wanted to figure it out myself, but once again, I have understood that my knowledge on this subject is limited. – niks Jun 08 '20 at 15:21
  • @niks I updated my answer with some minor improvements, that don't change drastically the behavior of the method. Small API change, the `RetryDelay` option is renamed to the more appropriate `MinimumRetryDelay`. For a version that propagates also the id `CreateRetryTransformBlock(...` check [this](https://dotnetfiddle.net/EUYLG5) fiddle. – Theodor Zoulias Jun 08 '20 at 17:17
  • @niks btw a year ago I would do have a hard time wrapping my head around all this async-await stuff. It's anything but trivial. Be assured that you'll get better by practice. – Theodor Zoulias Jun 08 '20 at 19:00
  • Yes, it now works! Thank you! I am exploring your answer further, and I think I will "improve" it a bit. For example, there might be `Exception`'s that are not temporary. Lets say, user has forgotten to plug internet cable and if pipeline contains 100 items, `MinimumRetryDelay` =5 sek and each faulted item has to wait for `SemaphoreSlip` at the end of the line, by the time it will reach its `MaxAttemptsPerItem`, considerable amount of time might have passed just to realize that (for example) cable is not plugged in. – niks Jun 10 '20 at 10:18
  • I could of course reduce `MaxRetriesTotal` but that could backfire in case of real temporary `Exception`. So I think I could implement some custom filter that will track count of specific exceptions and fault entire `TransformBlock` if it has reached specified threshold before block itself lets escape `Exception` when it has finally reached its `MaxAttemptsPerItem` and still failed. – niks Jun 10 '20 at 10:19
  • Hi @niks! Yeah, there is certainly ample space for improvements. For example you could add a new option `public Type[] FatalExceptionTypes { get; set; };` in the `RetryExecutionDataflowBlockOptions` class, and check for these types inside the `ObserveNewException` method. Or for more flexibility you could add an event-type property, like `public Action OnException { get; set; };` that would be invoked inside the `ObserveNewException`. – Theodor Zoulias Jun 10 '20 at 10:32
  • Thank you for suggestions, Theodor! I have one last question - before edit you used original `CancellationToken` and then you switched to linked token. Is there any particular reason for that? – niks Jun 11 '20 at 08:22
  • @niks yes, it is a small improvement. In case of a fatal exception (currently only the `RetryLimitException`) the `internalCTS` is canceled, causing all awaiting for the semaphore and the retry delays to be cancelled immediately. This increases the responsiveness of the block in case of a fatal exception. Btw feel free to ask as many questions as needed. Answering them is my pleasure. :-) – Theodor Zoulias Jun 11 '20 at 08:29
  • 1
    Ah, yes. I was thinking that you could use original `CancellationToken` for this but you can't call `Cancel` on it. You need `CancellationTokenSource` for that. What a silly oversight from me. I think I am done with this question for now. Thank you for your tremendous effort! Answering in such a detail and following through with all the questions in such a fast manner even after working hours!:) It feels like premium support package! Efcharistó!:) – niks Jun 11 '20 at 08:56