-1

I've got a WPF app using ReactiveUI, and it works by periodically fetching state from an external process.

Given a fetch observable as follows:

var latestState =
    Observable.Interval(TimeSpan.FromSeconds(.5))
    .SelectMany(async _ =>
    {
        try
        {
            var state = await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest());
            return state;
        }
        catch (Exception)
        {
            return null;
        }
    })
    .Publish();

I need to be able interrupt the fetching of data, if it fails.

What I want to be able to do, is something like:

var latestState =
    Observable.Interval(TimeSpan.FromSeconds(.5))
    .SelectMany(async _ =>
    {
        try
        {
            var state = await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest());
            return state;
        }
        catch (Exception)
        {
            // Show and await the dialog dismissal
            // instructions for starting the external process provided etc etc
            await dialogs.ShowErrorMessageAsync("Failed to fetch info", "Failed to get the latest state");

            /* MISSING: 
             * Some magical gubbins that will produce the state on a steady interval, but also still support
             * displaying the dialog and halting
             */
            return null;
        }
    })
    .Publish();

Obviously that's not feasible, because you end up with a chicken and egg problem.

Every way I've tried to slice this (e.g. using a Subject<bool> to track success / failure) has ultimately resulted in the fact that the failure case still needs to be able to emit an observable that fetches on the interval, and respects the failure handling - but that's not possible from inside the handler.

I'm almost certain this is an issue with conceptualising the way to signal the error / retrieve the data / resume the interval.


Partial solution / implementation based on comment feedback:

var stateTimer = Observable.Interval(TimeSpan.FromSeconds(10));

var stateFetcher =
    Observable.FromAsync(async () => 
        await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));

IObservable<GetRobotsStateReply> DisplayStateError(Exception causingException)
    => Observable.FromAsync(async () =>
    {
        await dialogs.ShowErrorMessageAsync(
            "Failed to get robot info",
            "Something went wrong");
        return new GetRobotsStateReply { };
    });

var stateStream =
    stateTimer
    .SelectMany(stateFetcher)
    .Catch((Exception ex) => DisplayStateError(ex))
    .Publish();

stateStream.Connect();

This implementation gets me the behaviour I need, and has the benefit of not triggering the timer when displaying the error dialog; however, it doesn't then subsequently trigger after dismissing the dialog (I believe because the stream has been terminated) - I'm going to use suggestion in the comments to fix this and then add an answer.


Working solution (can be added as an answer if reopened).

var fetchTimer = Observable.Timer(TimeSpan.FromSeconds(5));
var stateFetcher = Observable.FromAsync(async () =>
    await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));

var timerFetch = Observable.SelectMany(fetchTimer, stateFetcher);

IObservable<GetRobotsStateReply> GetErrorHandler(Exception ex) =>
    Observable.FromAsync(async () =>
    {
        await dialogs.ShowErrorMessageAsync(
            "TEST",
            "TEST");
        return (GetRobotsStateReply)null;
    });

IObservable<GetRobotsStateReply> GetStateFetchCycleObservable(
    IObservable<GetRobotsStateReply> source) =>
        source
        .Catch((Exception ex) => GetErrorHandler(ex))
        .SelectMany(state =>
            state != null
            ? GetStateFetchCycleObservable(timerFetch)
            : GetStateFetchCycleObservable(stateFetcher));

var latestState =
    GetStateFetchCycleObservable(timerFetch)
    .Publish();

Thanks to Theodor's suggestions, I've been able to hit on a solution.

I'd made the mistake of not thinking in terms of hot/cold observables and not making proper use of the built-in error handling mechanisms.

I was initially using Observable.Interval but this had the undesired consequence of firing and initiating a new remote request while the previous one was still in-flight (I suppose I could have throttled).

This solution works by using Observable.Timer to set up an initial delay, then make the remote request; this stream is then observed, on error it displays the dialog, and then binds back to the delay + fetch stream.

As the delay + fetch stream is cold, the delay works again as intended, and everything flows back around in a nice loop.

This has been further worked on, as there were issues with double firings of the timer (when using Retry), or the second time around not doing anything after the dialog dismissal.

I realised that was down to the inner observable not having the outer observable's projection back to a value-producing observable.

The new solution manages this, and even solves the problem of immediately re-fetching state if the user dismisses the dialog, or padding with a time interval in the case of a successful result.

Clint
  • 6,133
  • 2
  • 27
  • 48
  • 1
    Please [edit] your question to include the full source code you have as a [mcve], which can be compiled and tested by others. – Progman Nov 13 '20 at 19:18
  • 1
    @Progman I mean, this kind of *is* the minimal reproducible example. The question is how to interrupt and resume the timer based on whether the async operation fails or not. That's the code I've got here, so there's really not much else I can add except utter nonsense of me stumbling through incorrect attempts. – Clint Nov 13 '20 at 19:40
  • 1
    What is the return type of `GetRobotsStateAsync()`? Why are you using `SelectMany()`? Is it possible to do the error handling inside the `GetRobotsStateAsync()` method? Or can you handle the error handling inside the `SelectMany()` block with a `do-while()` loop? Can you write what observables you have and how you want to read/use them? Explain how/where/when exceptions are thrown and how you want to deal with them. There are several methods like `OnErrorResumeNext()` or `Retry()` you can use, but it is difficult without knowing what observables you have and what observable(s) you want. – Progman Nov 13 '20 at 20:00
  • 1
    So you want to suspend the `Observable.Interval` timer while the `GetRobotsStateAsync`+`ShowErrorMessageAsync` tasks are awaiting their resolution? – Theodor Zoulias Nov 13 '20 at 20:04
  • @Progman `GetRobotsStateAsync` is just a `Task` returning method. `SelectMany` is being used because it's doing the monadic bind wrapping around a `Task` to an Observable (Saves doing `Select` + `FromTask` + `Switch`). I could use `OnErrorResumeNext` or `Retry` to handle some of this, but fundamentally I want a timer to tick periodically, have it call the `GetRobotsStateAsync` method, and on failure, suspend the timer, wait for a dialog to be dismissed (confirming user intervention), and for the timer to then resume. – Clint Nov 16 '20 at 14:04
  • @TheodorZoulias yeah. See my response to @Progman; essentially I just want it suspending on failure, and then picking up again once the call to show the dialog returns. – Clint Nov 16 '20 at 14:05
  • I could post an answer demonstrating how to use the `OnErrorResumeNext` operator instead of catching and swallowing the exception of `GetRobotsStateAsync`, but it seems that you already know about this option, and you don't like it for some reason. Could you edit your question and explain what the reason is? – Theodor Zoulias Nov 16 '20 at 14:12
  • @TheodorZoulias Oh, I'm aware it exists, but I'm not sure if it'll help me. As far as I'm aware it just pulls a value from the secondary observable stream in the event of an upstream error in the first one; although, now that I think about it - though I'm not sure how to do so - could I make the second stream a cold observable that when subscribed shows the dialog, waits for that to be dismissed? I'd still have the problem of what value to return. If I try to call the `GetRobotsAsync()` in the second observable, then my error handling logic isn't there... – Clint Nov 16 '20 at 14:28
  • 1
    Yeap, now that I think of the it the `OnErrorResumeNext` doesn't make much sense. How about rethrowing the exception (with `throw`) after awaiting the `dialogs.ShowErrorMessageAsync`, and then adding the `Retry` operator after the `SelectMany`? – Theodor Zoulias Nov 16 '20 at 14:53
  • @TheodorZoulias I will try that! I'm going to edit the question to show something I've just put together that does indeed work, but suffers from stopping after the first error (presumably because the stream gets terminated). The suggestion of rethrowing and using retry afterwards sounds like the way to go! – Clint Nov 16 '20 at 15:05
  • 1
    I am now starting to understand the problem you are facing. I think that it could be solved by combining the operators `Interval`, `Select`, `Concat`, `Catch` and `Retry`, in this order. I will post an answer when the question is reopened (I have already voted to reopen). – Theodor Zoulias Nov 16 '20 at 15:11
  • @TheodorZoulias thanks for your help! I wasn't able to piece together a solution with all of those operators, but I think I have settled on a way of doing it that feels fairly idiomatic. – Clint Nov 16 '20 at 16:02
  • 1
    Clint TBH the problem you are facing is not presented very well inside the question, and this can make it hard to persuade other members to vote for reopening it. IMHO there is too much "look at my solution", and not enough "let me explain in baby steps what I am trying to do" inside the question. If I was in your place I would consider a total rewrite of the question, reducing the code to the absolute minimal. Unless of course you are already satisfied with the current solution, and you don't want to spend more time on it. – Theodor Zoulias Nov 16 '20 at 17:12
  • 1
    @TheodorZoulias yeah, that's fair. Part of the problem is it's such an abstract thing, and Rx can get... messy. I seem to have settled on a solution now, and it's actually enabled me to be a bit smarter about it and combine dialogs and retry strategies in together as I like, so it seems to work, and I **think** I understand why, which is always the important part. – Clint Nov 16 '20 at 17:24
  • 1
    Yeap, RX can make you feel smarter, or frustrated sometimes. – Theodor Zoulias Nov 16 '20 at 19:00
  • @TheodorZoulias here's the new self-answered question if you're curious: https://stackoverflow.com/questions/64866413/rx-net-reactive-ui-mahapps-metro-repeating-retrying-asynchronous-request/64866414#64866414 – Clint Nov 16 '20 at 22:06
  • 1
    Honestly I am afraid that the [new question](https://stackoverflow.com/questions/64866413/rx-net-reactive-ui-mahapps-metro-repeating-retrying-asynchronous-request) is not very good either. I hope that I am wrong, but I find it unlikely that you'll get any useful answers out of it. – Theodor Zoulias Nov 16 '20 at 23:03
  • 1
    @TheodorZoulias haha no worries. It's there mostly for my own reference and will likely just accept my own answer. – Clint Nov 17 '20 at 00:23

1 Answers1

1

Here is my suggestion:

var observable = Observable
    .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(500))
    .Select(x => Observable.FromAsync(async () =>
    {
        return await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest());
    }))
    .Concat()
    .Catch((Exception ex) => Observable.FromAsync<GetRobotsStateReply>(async () =>
    {
        await dialogs.ShowErrorMessageAsync("Failed to fetch info",
            "Failed to get the latest state");
        throw ex;
    }))
    .Retry();

The Timer+Select+Concat operators ensure that the GetRobotsStateAsync will be executed without overlapping. In case of an exception the timer will be discarded, the Catch operator will kick in, and the original error will be rethrown after closing the dialog, in order to trigger the Retry operator. Then everything will be repeated again, with a brand new timer. The loop will keep spinning until the subscription to the observable is disposed.

This solution makes the assumption that the execution of the GetRobotsStateAsync will not exceed the timer's 500 msec interval in a regular basis. Otherwise the ticks produced by the timer will start stacking up (inside the Concat's internal queue), putting the system under memory pressure. For a more sophisticated (but also more complex) periodic mechanism that avoids this problem look at this answer.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Thanks for this! Yeah, the stacking of the timer was causing me a headache, and in reality the tick size is considerably smaller (<.5s) as it's used for near-realtime state replication over to a different process (it hosts a 32 bit DLL for p/invoke and debugging didn't like some of the calls). The solution I settled on in the end abandons an interval in favour of re-constructing the observables by self-referencing a construction function; it allowed me to break out sub-regions of the flow for retry, and for gluing the flow between them. – Clint Nov 17 '20 at 12:31
  • 1
    @Clint OK, I see. You may want to be aware about an alternative idea, although you may not need it at the time being. There is a custom operator named [`ExhaustMap`](https://stackoverflow.com/questions/64353907/how-can-i-implement-an-exhaustmap-handler-in-rx-net) that allows to drop messages in case the processing of a previous message has not been completed. You could use it with an always active outer `Timer`, and simply drop its ticks while either a `GetRobotsStateAsync` is in-flight, or an error dialog is shown. – Theodor Zoulias Nov 17 '20 at 16:31
  • Caution, the `Concat` operator [behaves wierdly](https://github.com/dotnet/reactive/issues/1634) in the current version of the Rx library (5.0.0). My advice is to use the equivalent `Merge(1)` operator instead, until the issue with the `Concat` is fixed. The `1` is the value of the `maxConcurrent` parameter. Setting this parameter to `1` means no concurrency. – Theodor Zoulias Nov 05 '21 at 19:48