1

Here is simple code sample using TransformBlock and ActionBlock. I am using Polly library to help with retry logic. Problem is that once I manually open CircuitBreaker and then again close it, link between downloadBlock and actionBlock is broken. This does not happen if I ommit setting predicate in the call to LinkTo:

private readonly TransformBlock<DataClass, DataClass> downloadBlock;
private readonly ActionBlock<DataClass> actionBlock;
private readonly AsyncCircuitBreakerPolicy circuitBreaker;
private readonly AsyncRetryPolicy retryPolicy;

retryPolicy = Policy.Handle<WebException>().RetryAsync(4);
circuitBreaker = Policy.Handle<WebException>().CircuitBreakerAsync(10, TimeSpan.FromSeconds(10));

downloadBlock = new TransformBlock<DataClass, DataClass>(async (data) =>
{
    var finalPolicy = retryPolicy.WrapAsync(circuitBreaker);
    try
    {
        await finalPolicy.ExecuteAsync(async () =>
        {
             //await DoSomething();
             data.Status = Status.Completed;
        });
    }
    catch (WebException we)
    {
         data.Status = Status.Failed;
         //Do logging
    }
    catch (BrokenCircuitException)
    {
         data.Status = Status.Failed;
         //Do logging
    }
    return data;
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });


actionBlock = new ActionBlock<DataClass>((data) =>
{
   //DoSomething(data);
}});
//Here, if I set data => data.Status != Status.Failed and then at some point I manually call
//circuitBreaker.Isolate() and then after some time circuitBreaker.Reset() to manually close the circuit again,
//items newly pushed into the pipeline are being processed in the downloadBlock but the results
//are not propagated to actionBlock even if their status is OK. This does not happen if ommit setting the predicate.

downloadBlock.LinkTo(actionBlock,
                     new DataflowLinkOptions { PropagateCompletion = true }, data => data.Status != Status.Failed);

I could, of course, check the status in the actionBlock itself, but I would like to know if anybody has encountered such behavior and what could be the reason for it?

EDIT: These are the steps necessary to reproduce the problem:

  1. push data into pipeline,process it,
  2. call circuitBreaker.Isolate(),
  3. push new data into pipeline, it gets caught in BrokenCircuitException and logically won't make to the next block because of the predicate. Thats fine.
  4. call circuitBreaker.Reset(). Now everything should be working.
  5. push new data into pipeline, it gets processed in the downloadBlock, its status is OK, but it simply does not propagate to the next ActionBlock
Peter Csala
  • 17,736
  • 16
  • 35
  • 75
niks
  • 579
  • 1
  • 3
  • 14
  • What do you mean by this *"link between downloadBlock and actionBlock is broken"*? Your link has a message predicate that says only those data should go from SourceBlock to TargetBlock where the condition is met. Even though the links can be dynamic (you can link and unlink blocks at any time) they won't unlink because of exception. And you catch the `BrokenCircuitException`, so please elaborate your sentence because it is unclear what do you meant here. – Peter Csala Jul 14 '20 at 14:33
  • @Peter I did some deeper testing and found exact condition when this behavior appears. I will describe steps: 1) push data into pipeline,process it, 2) call `circuitBreaker.Isolate()`, 3) push new data into pipeline, it gets caught in `BrokenCircuitException` and logically won't make to the next block because of the predicate. Thats fine. 4) call `circuitBreaker.Reset()`. Now everything should be working. 5) push new data into pipeline, it gets processed in the `downloadBlock`, its status is OK, but it simply does not propagate to the next `ActionBlock`. – niks Jul 14 '20 at 15:41
  • @Peter And if you repeat all the steps except number 3(pushing new data into pipeline while circuit is open) everything works fine. It seems that the second batch of information does something to the pipeline link. – niks Jul 14 '20 at 15:44
  • First of all please add the described scenario to your question it will give clarity for future readers. Secondly this problem's root cause could be the fact that your retry logic does not enforce any penalty. `SleepDuration` in Polly's nomenclature. Please add logging to your Retry and CB policies. [Here](https://stackoverflow.com/a/62832357/13268855) I've described briefly how you can do that. – Peter Csala Jul 15 '20 at 08:52
  • @Peter I made the changes you asked for. I have also tried to enforce penalty like this: `WaitAndRetryAsync(2, retryAttempt => TimeSpan.FromSeconds(2)` But the problem still exists. – niks Jul 15 '20 at 10:53
  • Have you provided a callback for the `onRetryAsync` to check what exception has been thrown and when? – Peter Csala Jul 15 '20 at 10:59
  • @Peter I am not sure what do you mean by this. Could you please elaborate? – niks Jul 15 '20 at 11:07
  • Sorry I missed that your current retry logic fires only in case of `WebException`. (I thought it fires at any `Exception`). Can you check please the `OutputAvailableAsync` [1](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblock.outputavailableasync?view=netcore-3.1) method before and after you call the `Reset` manually? – Peter Csala Jul 15 '20 at 12:31
  • @Peter Tasks `Result` is false before and after calling `Reset`. I don't have an idea why this is happening. I am starting to think that I should just omit the predicate and let everything pass from block to block and then just check the status of an item inside the next block. I think that is not that big of a deal. – niks Jul 16 '20 at 10:10
  • Does it work properly without the message filter? If so then what you have to consider the amount of messages during a given period. – Peter Csala Jul 16 '20 at 11:08
  • 1
    @Peter Yes, it works properly without the message filter. This behavior happens even if I process single message, so I suppose it is not because the pipeline is overloaded. – niks Jul 16 '20 at 12:09
  • Great then use it without filtering :D I've just checked the related [source code](https://github.com/dotnet/corefx/blob/master/src/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs#L92). As you can see the provided `predicate` will indicate a usage of a class `FilteredLinkPropagator`. Its `OfferMessage` method will decide whether propagate the data or not. I haven't got time to truly deep dive into this, but there is a case where it returns with a `DataflowMessageStatus.Declined` which is suspicious for me. – Peter Csala Jul 16 '20 at 12:35
  • 1
    @Peter Yes, I guess I will use it without filtering at the moment. Maybe I will start a bounty if I decide that I really need it to work with filtering. Thank you very much for all your effort! – niks Jul 16 '20 at 13:59
  • I guess it would make sense if you reply to own question where you list all of the findings and the current solution. – Peter Csala Jul 16 '20 at 14:01

0 Answers0