0

Info:

The purpose of the method is to get a secret from a stream, get a uri from a stream. Then hit the uri with the secret and some additional parameters type and name to obtain some data.

Secrets.GetSecret() is an IObservable<string> and Urls.GetHostUrl() is an IObservable<Uri>

Problem

The problem I am having is that if the dataFetcher.GetFromUrl call throws an exception then the observable terminates. The RetryAfterDelay extension method doesn't seem to be working at all.

What I want to be able to do is to be able to catch and log an exception but ideally not have the observable terminate. Or if it has to terminate then to re-subscribe to the original streams, so it behaves as if it has logged/swallowed the exception.

The reason for this is that the network / host isn't the most stable and so sometimes it can error.

Method

private IObservable<string> GetFromHostUrl(string type, string name, int retryTime)
{
    return Observable.Interval(TimeSpan.FromSeconds(retryTime), schedulerProvider.Default).StartWith(-1L)
        .CombineLatest(Observable.Defer(() => Secrets.GetSecret()),
                                              Urls.GetHostUrl(),
                                              (_, secret, hostUrl) => dataFetcher.GetFromUrl(hostUrl, type, name, secret).Result)
        .Do(s =>
                {
                    // log success
                },
                e =>
                {
                    // log failure
                })
        .DistinctUntilChanged()
        .RetryAfterDelay(TimeSpan.FromSeconds(retryTime), schedulerProvider.Default)
        .Replay(1)
        .RefCount();
}

Extension Methods

// c/o: http://stackoverflow.com/questions/18978523/write-an-rx-retryafter-extension-method
public static class ExtensionMethods
{

    private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
    {
        // Don't delay the first time        
        yield return source;

        while (true)
        {
            yield return source.DelaySubscription(dueTime, scheduler);
        }
    }

    public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
    {
        return RepeateInfinite(source, dueTime, scheduler).Catch();
    }
}
ash_s
  • 85
  • 2
  • 8

1 Answers1

0

Sounds like a text-book example for Retry.

public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
    return RepeateInfinite(source, dueTime, scheduler).Retry();
}
Shlomo
  • 14,102
  • 3
  • 28
  • 43