The policies of the Polly library, for example Bulkhead
, Retry
etc, contain a method ExecuteAsync
with many overloads (18), but none of them allows to execute the policy for all elements of an IEnumerable
and gather the results. It seems that the whole library is focused on the goal of executing a single action, leaving the responsibility of managing multiple executions to the client code. I would like to fix this omission by implementing an extension method for all Polly policies (all implementations of the IAsyncPolicy
interface), with the signature below:
public static Task<TResult[]> ExecuteAsync<TSource, TResult>(
this IAsyncPolicy policy,
IEnumerable<TSource> source,
Func<TSource, Task<TResult>> action,
bool continueOnCapturedContext = false,
bool onErrorContinue = false)
The continueOnCapturedContext
parameter controls whether to continue on the captured synchronization context, and should just be passed
to the native ExecuteAsync
method:
Task<TResult> IAsyncPolicy.ExecuteAsync<TResult>(
Func<CancellationToken, Task<TResult>> action,
CancellationToken cancellationToken,
bool continueOnCapturedContext);
The onErrorContinue
parameter is the most important aspect of this question, since it controls the behavior in case of a policy failure. My intention is to use this extension method with thousands of elements, and in case of any exceptions that are not expected/handled by my policy¹ I would like to terminate the whole execution promptly and gracefully. In case the argument onErrorContinue
has the default value false
, the first unhandled exception should cause the cancellation of all pending operations, and the whole execution should terminate as soon as all started operations have completed. In the opposite case of onErrorContinue: true
, all elements should be processed by the policy. Finally all exceptions should be propagated, bundled in an AggregateException
, independently of the onErrorContinue
value.
How could I implement this extension method?
Hypothetical usage scenario of this method:
var policy = Policy
.BulkheadAsync(maxParallelization: 10, maxQueuingActions: Int32.MaxValue)
.WrapAsync(Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(retryCount: 3,
sleepDurationProvider: n => TimeSpan.FromMilliseconds(1000 * n))
);
var urls = Enumerable.Range(1, 1000).Select(n => n.ToString());
var random = new Random(0);
string[] results = await policy.ExecuteAsync(urls, async url =>
{
await Task.Delay(500); // Simulate a web request
lock (random) if (random.NextDouble() < 0.66)
throw new HttpRequestException($"Url #{url} failed");
return url;
}, onErrorContinue: false);
¹ This should happen rarely in production, but may happen frequently during the development, and could hurt productivity.