Here is simple code sample using TransformBlock
and ActionBlock
. I am using Polly library to help with retry logic. Problem is that once I manually open CircuitBreaker
and then again close it, link between downloadBlock
and actionBlock
is broken. This does not happen if I ommit setting predicate in the call to LinkTo
:
private readonly TransformBlock<DataClass, DataClass> downloadBlock;
private readonly ActionBlock<DataClass> actionBlock;
private readonly AsyncCircuitBreakerPolicy circuitBreaker;
private readonly AsyncRetryPolicy retryPolicy;
retryPolicy = Policy.Handle<WebException>().RetryAsync(4);
circuitBreaker = Policy.Handle<WebException>().CircuitBreakerAsync(10, TimeSpan.FromSeconds(10));
downloadBlock = new TransformBlock<DataClass, DataClass>(async (data) =>
{
var finalPolicy = retryPolicy.WrapAsync(circuitBreaker);
try
{
await finalPolicy.ExecuteAsync(async () =>
{
//await DoSomething();
data.Status = Status.Completed;
});
}
catch (WebException we)
{
data.Status = Status.Failed;
//Do logging
}
catch (BrokenCircuitException)
{
data.Status = Status.Failed;
//Do logging
}
return data;
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
actionBlock = new ActionBlock<DataClass>((data) =>
{
//DoSomething(data);
}});
//Here, if I set data => data.Status != Status.Failed and then at some point I manually call
//circuitBreaker.Isolate() and then after some time circuitBreaker.Reset() to manually close the circuit again,
//items newly pushed into the pipeline are being processed in the downloadBlock but the results
//are not propagated to actionBlock even if their status is OK. This does not happen if ommit setting the predicate.
downloadBlock.LinkTo(actionBlock,
new DataflowLinkOptions { PropagateCompletion = true }, data => data.Status != Status.Failed);
I could, of course, check the status in the actionBlock
itself, but I would like to know if anybody has encountered such behavior and what could be the reason for it?
EDIT: These are the steps necessary to reproduce the problem:
- push data into pipeline,process it,
- call circuitBreaker.Isolate(),
- push new data into pipeline, it gets caught in BrokenCircuitException and logically won't make to the next block because of the predicate. Thats fine.
- call circuitBreaker.Reset(). Now everything should be working.
- push new data into pipeline, it gets processed in the downloadBlock, its status is OK, but it simply does not propagate to the next ActionBlock