Info:
The purpose of the method is to get a secret from a stream, get a uri from a stream. Then hit the uri with the secret and some additional parameters type
and name
to obtain some data.
Secrets.GetSecret()
is an IObservable<string>
and Urls.GetHostUrl()
is an IObservable<Uri>
Problem
The problem I am having is that if the dataFetcher.GetFromUrl
call throws an exception then the observable terminates. The RetryAfterDelay
extension method doesn't seem to be working at all.
What I want to be able to do is to be able to catch and log an exception but ideally not have the observable terminate. Or if it has to terminate then to re-subscribe to the original streams, so it behaves as if it has logged/swallowed the exception.
The reason for this is that the network / host isn't the most stable and so sometimes it can error.
Method
private IObservable<string> GetFromHostUrl(string type, string name, int retryTime)
{
return Observable.Interval(TimeSpan.FromSeconds(retryTime), schedulerProvider.Default).StartWith(-1L)
.CombineLatest(Observable.Defer(() => Secrets.GetSecret()),
Urls.GetHostUrl(),
(_, secret, hostUrl) => dataFetcher.GetFromUrl(hostUrl, type, name, secret).Result)
.Do(s =>
{
// log success
},
e =>
{
// log failure
})
.DistinctUntilChanged()
.RetryAfterDelay(TimeSpan.FromSeconds(retryTime), schedulerProvider.Default)
.Replay(1)
.RefCount();
}
Extension Methods
// c/o: http://stackoverflow.com/questions/18978523/write-an-rx-retryafter-extension-method
public static class ExtensionMethods
{
private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
// Don't delay the first time
yield return source;
while (true)
{
yield return source.DelaySubscription(dueTime, scheduler);
}
}
public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
return RepeateInfinite(source, dueTime, scheduler).Catch();
}
}