-1

Given an observable of form:

var fetchTimer = Observable.Timer(TimeSpan.FromSeconds(1));
var stateFetcher =
    Observable.FromAsync(async () => await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));

var delayedFetch = fetchTimer.SelectMany(stateFetcher);

This provides the means for fetching the state after a delay.

A modification can do this at regular intervals:

var regularFetch = Observable.Interval(TimeSpan.FromSeconds(5)).Select(_ => stateFetcher).Switch();

This requests a value every 5 seconds.

The request can fail however (remote service unreachable etc); with that in mind, trying to produce a mechanism to retry the operation, as well as hooks for alerting the user can be tricky.

Suspending a timer-based operation on failure - this question by me covers the initial approach to this, as well as some of the attempts / a partial solution.

Here I want to share the solution I arrived at.

Clint
  • 6,133
  • 2
  • 27
  • 48
  • Just a side note, don't put `async`/`await` inside a `FromAsync` call. – Enigmativity Nov 17 '20 at 01:38
  • Is there any reason that `.Retry(...)` doesn't work for you? – Enigmativity Nov 17 '20 at 01:42
  • @Enigmativity oh? How come on the `async/await` thing? `Retry` does work for me in the solution I've got now, the big problem was that I was bundling the timer with it, and of course it was emitting while the dialog was visible, which was causing issues. – Clint Nov 17 '20 at 12:27
  • `Observable.FromAsync` unwraps the `Task` already. By putting in `async`/`await` you are unwrapping, wrapping it again, and unwrapping it. – Enigmativity Nov 17 '20 at 21:52

1 Answers1

0

We can summarise the problem as follows:

  1. We have a source observable that can produce a value or error
  2. We need to do something if an error occurs
  3. From the first error we want to make multiple attempts with notification
  4. If those attempts all fail, we want to do something
  5. At the end of the process we want to restart it all.

So:

Error -> Initial dialog -> Retry with notifications on each attempt -> Do it all again

At any point throughout this process a successful value emission should bypass everything and flow back out.

With that highly opinionated approach in mind, here is the utility I created:

public static IObservable<T> WithGatedRetry<T>(
    this IObservable<T> source,
    int retriesPerCycle,
    Func<Exception, Task> onInitialFailure,
    Func<Action<Func<Task>>, Task<Func<Exception, int, Task>>> retryNotificationBlock,
    Func<Exception, Task> onFailedCycle)
{
    IObservable<T> GetInitialHandler(Exception e) =>
        Observable.FromAsync(() => onInitialFailure(e))
        .Select(_ => (T)default);

    IObservable<T> GetCycleFailureHandler(Exception e) =>
        Observable.FromAsync(() => onFailedCycle(e))
        .Select(_ => (T)default);

    IObservable<T> GetRetryFlow() =>
        Observable.Create<T>(async sub =>
        {
            var attempt = 1;
            Func<Task> disposeCallback = () => Task.CompletedTask;
            var notifier = await retryNotificationBlock(dc =>
            {
                disposeCallback = dc;
            });

            await notifier(null, 1);

            return
                source
                .Do(
                     _ =>
                    {
                    },
                    async (Exception e) =>
                    {
                        if (attempt + 1 <= retriesPerCycle)
                        {
                            await notifier(e, ++attempt);
                        }
                    }
                )
                .Retry(retriesPerCycle)
                .Finally(async () =>
                {
                    if (disposeCallback != null)
                    {
                        await disposeCallback();
                    }
                })
                .Subscribe(
                    val => { sub.OnNext(val); sub.OnCompleted(); },
                    (Exception e) => { sub.OnError(e); }
                );
        });

    IObservable<T> GetCycleFlow() =>
        GetRetryFlow()
        .Catch((Exception e) =>
            GetCycleFailureHandler(e)
            .Select(_ => GetCycleFlow())
            .Switch()
        )
        .Retry();

    IObservable<T> GetPrimaryFlow() =>
        source
        .Catch((Exception e) => GetInitialHandler(e))
        .Select(val =>
            EqualityComparer<T>.Default.Equals(val, default)
            ? GetCycleFlow().Select(_ => GetPrimaryFlow()).Switch()
            : GetPrimaryFlow().StartWith(val)
        )
        .Switch();

    return GetPrimaryFlow();
}

I'll fully admit this may not be the best way to do it, and there's a bit of a callback-inside-a-callback kludge in the notification block (for each retry attempt), in order to support cleaning up once an retry "cycle" has been completed (successfully or otherwise).

Usage is as follows:

var latestState =
    Observable.SelectMany(fetchTimer, stateFetcher)
    .WithGatedRetry(
        3,
        async ex =>
        {
            // show initial error dialog
        },
        async (disposerHook) =>
        {
            // Show the "attempting retries" dialog

            disposerHook(async () =>
            {
                // Close the "attempting retries" dialog
                // and do any cleanup
            });

            return async (Exception ex, int attempt) =>
            {
                // Update the dialog
                // ex is the exception produced by the just-completed attempt
            };
        },
        async ex =>
        {
           // Show the "We still can't quite get it" dialog
           // after this task completes, the entire process starts again
        }
    )
    .Publish();

This approach allows for the tailored hook-points, and flows successful values as expected.

In fact, downstream subscribers should only ever see a value when one is successfully provided - they also shouldn't see an error as it sits in an infinite retry.

In comparison with the solution in the original question, this uses Select + Switch as opposed to SelectMany in order to ensure inner observables are correctly unsubscribed and disposed.

Clint
  • 6,133
  • 2
  • 27
  • 48