So a series of articles popped on my radar this morning. It started with this question, which lead to the original example and source code on GitHub.
I rewrote it slightly, so I can start using it in Console and Service applications:
public static class Extensions
{
static readonly TaskPoolScheduler Scheduler = new TaskPoolScheduler(new TaskFactory());
// Licensed under the MIT license with <3 by GitHub
/// <summary>
/// An exponential back off strategy which starts with 1 second and then 4, 8, 16...
/// </summary>
[SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));
/// <summary>
/// A linear strategy which starts with 1 second and then 2, 3, 4...
/// </summary>
[SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
public static readonly Func<int, TimeSpan> LinearStrategy = n => TimeSpan.FromSeconds(1*n);
/// <summary>
/// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the
/// specified number of times or until it successfully terminates. Allows for customizable back off strategy.
/// </summary>
/// <param name="source">The source observable.</param>
/// <param name="retryCount">The number of attempts of running the source observable before failing.</param>
/// <param name="strategy">The strategy to use in backing off, exponential by default.</param>
/// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
/// <param name="scheduler">The scheduler.</param>
/// <returns>
/// A cold observable which retries (re-subscribes to) the source observable on error up to the
/// specified number of times or until it successfully terminates.
/// </returns>
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
public static IObservable<T> RetryWithBackoffStrategy<T>(
this IObservable<T> source,
int retryCount = 3,
Func<int, TimeSpan> strategy = null,
Func<Exception, bool> retryOnError = null,
IScheduler scheduler = null)
{
strategy = strategy ?? ExponentialBackoff;
scheduler = scheduler ?? Scheduler;
if (retryOnError == null)
retryOnError = e => true;
int attempt = 0;
return Observable.Defer(() =>
{
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
.Select(item => new Tuple<bool, T, Exception>(true, item, null))
.Catch<Tuple<bool, T, Exception>, Exception>(e => retryOnError(e)
? Observable.Throw<Tuple<bool, T, Exception>>(e)
: Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
? Observable.Return(t.Item2)
: Observable.Throw<T>(t.Item3));
}
}
Now to test how it works, I've written this small program:
class Program
{
static void Main(string[] args)
{
int tryCount = 0;
var cts = new CancellationTokenSource();
var sched = new TaskPoolScheduler(new TaskFactory());
var source = Observable.Defer(
() =>
{
Console.WriteLine("Action {0}", tryCount);
var a = 5/tryCount++;
return Observable.Return("yolo");
});
source.RetryWithBackoffStrategy(scheduler: sched, strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException);
while (!cts.IsCancellationRequested)
source.Subscribe(
res => { Console.WriteLine("Result: {0}", res); },
ex =>
{
Console.WriteLine("Error: {0}", ex.Message);
},
() =>
{
cts.Cancel();
Console.WriteLine("End Processing after {0} attempts", tryCount);
});
}
}
Initially I have thought, that the event of subscription, will automatically trigger all the subsequent retires. That was not the case, so I had to implement a Cancellation Token and loop until it signals that all reties have been exhausted.
The other option is to use AutoResetEvent:
class Program
{
static void Main(string[] args)
{
int tryCount = 0;
var auto = new AutoResetEvent(false);
var source = Observable.Defer(
() =>
{
Console.WriteLine("Action {0}", tryCount);
var a = 5/tryCount++;
return Observable.Return("yolo");
});
source.RetryWithBackoffStrategy(strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException);
while (!auto.WaitOne(1))
{
source.Subscribe(
res => { Console.WriteLine("Result: {0}", res); },
ex =>
{
Console.WriteLine("Error: {0}", ex.Message);
},
() =>
{
Console.WriteLine("End Processing after {0} attempts", tryCount);
auto.Set();
});
}
}
}
In both scenarios it will display these lines:
Action 0
Error: Attempted to divide by zero.
Action 1
Result: yolo
End Processing after 2 attempts
The question I have to this crowd is: Is this the best way to use this extension? Or is there a way to subscribe to the Observable so it will re-fire itself, up to the number of retries?
FINAL UPDATE
Based on Brandon's suggestion, this is the proper way of subscribing:
internal class Program
{
#region Methods
private static void Main(string[] args)
{
int tryCount = 0;
IObservable<string> source = Observable.Defer(
() =>
{
Console.WriteLine("Action {0}", tryCount);
int a = 5 / tryCount++;
return Observable.Return("yolo");
});
source.RetryWithBackoffStrategy(strategy: Extensions.ExponentialBackoff, retryOnError: exception => exception is DivideByZeroException, scheduler: Scheduler.Immediate)
.Subscribe(
res => { Console.WriteLine("Result: {0}", res); },
ex => { Console.WriteLine("Error: {0}", ex.Message); },
() =>
{
Console.WriteLine("End Processing after {0} attempts", tryCount);
});
}
#endregion
}
The output will be slightly different:
Action 0
Action 1
Result: yolo
End Processing after 2 attempts
This turned out to be quite useful extension. Here is another example how it can be used, where strategy and error processing is given using delegates.
internal class Program
{
#region Methods
private static void Main(string[] args)
{
int tryCount = 0;
IObservable<string> source = Observable.Defer(
() =>
{
Console.WriteLine("Action {0}", tryCount);
int a = 5 / tryCount++;
return Observable.Return("yolo");
});
source.RetryWithBackoffStrategy(
strategy: i => TimeSpan.FromMilliseconds(1),
retryOnError: exception =>
{
if (exception is DivideByZeroException)
{
Console.WriteLine("Tried to divide by zero");
return true;
}
return false;
},
scheduler: Scheduler.Immediate).Subscribe(
res => { Console.WriteLine("Result: {0}", res); },
ex => { Console.WriteLine("Error: {0}", ex.Message); },
() =>
{
Console.WriteLine("Succeeded after {0} attempts", tryCount);
});
}
#endregion
}
Output:
Action 0
Tried to divide by zero
Action 1
Result: yolo
Succeeded after 2 attempts