20

RxJs stops listening to click events when an inner observable errors (Ajax request). I'm trying to figure out how to keep the event listener hooked to the button click event and gracefully handle the inner ajax error.

Here is my example code and a link to plunkr

var input = $("#testBtn");
var test = Rx.Observable.fromEvent(input,'click');

var testAjax = function() {
  return Rx.Observable.range(0,3).then(function(x){ 
    if(x==2)throw "RAWR"; //Simulating a promise error.
    return x;
  });
}

test.map(function(){
  return Rx.Observable.when(testAjax());
})
.switchLatest()
.subscribe(
  function (x) {
      console.log('Next: ', x);
  },
  function (err) {
      console.log('Error: ' + err);   
  },
  function () {
      console.log('Completed');   
  });

http://plnkr.co/edit/NGMB7RkBbpN1ji4mfzih

Jonathan Sheely
  • 501
  • 1
  • 6
  • 13
  • For what purpose are you using `when` and `then`? It doesn't look like it's buying you anything... – cwharris Jul 21 '14 at 17:02
  • This was done just as an example of faking an ajax request. A mockjax example is provided here. http://plnkr.co/edit/UtReTejs524rX2DKbIoj?p=preview – Jonathan Sheely Jul 21 '14 at 17:21

3 Answers3

22

You can use the catch operator (or the catchException alias) to catch and handle errors which occur in an observable, so that subscribers are not notified of the error.

Ignore Errors:

return Rx.Observable
    .when(testAjax())
    .catch(Rx.Observable.empty()); // continues with an empty obs after error.

Handle Errors:

var empty = Rx.Observable.return('default value');
return Rx.Observable
    .when(testAjax())
    .catch(function (error) {
        console.log('error:', error);
        return empty;
    });
cwharris
  • 17,835
  • 4
  • 44
  • 64
  • 2
    THANK YOU!. I was like 90% there. I figured out that running catch() was what I needed to run on the inner observable but couldn't figure out what the return value should be. Ultimately `Rx.Observable.Empty()` was what I was missing. Here is an updated plunker http://plnkr.co/edit/UtReTejs524rX2DKbIoj?p=preview – Jonathan Sheely Jul 21 '14 at 17:09
  • +1 for eventually reaching the answer on your own. :) Also, if in doubt, just create an observable: `Observable.create`. Just make sure you keep it hygienic. – cwharris Jul 21 '14 at 17:12
  • 1
    If I put the catch() on the outer Observable, after the `flatMapLatest` it will complete the subscription. This only appears to work if I handle all exceptions on the inner observables. Any suggestions? – Jonathan Sheely Jul 21 '14 at 17:24
  • 1
    Right on. `flatMapLatest` has no mechanism for handling errors from the observables it subscribes to, so you have a couple of options. 1) don't let those errors "bubble" to the `flatMapLatest`, or 2) create a "type" that can house either a value OR an error, and let flatMapLatest "bubble" both of those through via onNext. On the subscriber side, you can "pattern match" for that type, and decide what to do with the error/value. – cwharris Jul 21 '14 at 18:39
  • At first I did not understand that this `catch` operator should be in the "inner" Observable, returned from the loading function to `switchMap` of main Observable, which we want to not be stopped on errors. I tried this `catch` on main Observable and it obviously did not work. :) Maybe you should include the "full" example in the answer. – Ruslan Stelmachenko Nov 22 '16 at 19:50
  • 2
    What if I want subscribers to be notified of the error and the stream to continue normally? – David Jan 24 '17 at 11:32
  • @David that's application-level functionality. one way of achieving this would be to use an observable that yields `type Response = { value: SomeValueType } | { error: Error }` instead of just `value`. This upgrades the error to something your application can consume normally, just as you might handle an xml request error. – cwharris Nov 01 '18 at 02:08
6

I ran into my own issues interpreting how this (and onErrorResumeNext) functioned. The struggle I encountered was with what context the catch (or resume next) applied to. The simple colloquial translation that makes sense for me is the following:

Given a stream (or observable) of observables, catch (or onErrorResumeNext) will consume the error and allow you to provide one or more observables to continue the original stream with.

The key take away is that your original source is interrupted and replaced with the observable(s) you provide in the catch/onErrorResumeNext function. This means that if you had something like this:

var src = Rx.Observable
    .interval(500)
    .take(10)
    .select(function(x) {
        if (x == 5) {
            return Rx.Observable.throw('Simulated Failure');
        }

        return Rx.Observable.return(x * 2);
    })

Then adding .catch(Rx.Observable.return('N/A')) or .onErrorResumeNext(Rx.Observable.return('N/A')) will not actually just continue your stream (sourced by the interval), but rather end the stream with a final observable (the N/A).

If you are looking to instead handle the failure gracefully and continue the original stream you need to so something more like .select(function(x) { return x.catch(Rx.Observable.return('N/A')); }). Now your stream will replace any observable element in the stream that fails with a caught default and then continue on with the existing source stream.

var src = Rx.Observable
    .interval(500)
    .take(10)
    .select(function(x) {
        if (x == 5) {
            return Rx.Observable.throw('Simulated Failure');
        }

        return Rx.Observable.return(x * 2);
    })
    //.catch(Rx.Observable.return('N/A'))
    //.onErrorResumeNext(Rx.Observable.return('N/A'))
    .select(function(x) { return x.catch(Rx.Observable.return('N/A')); })
    .selectMany(function(x) { return x; });


var sub = src.subscribe(
    function (x) { console.log(x); },
    function (x) { console.log(x); },
    function () { console.log('DONE'); }
);

// OUTPUT:
// 0
// 2
// 4
// 6
// 8
// N/A
// 12
// 14
// 16
// 18
// DONE

Here is a JSFiddle that shows this in action.

pjs
  • 2,601
  • 1
  • 20
  • 24
  • What does the `selectMany(function(x) { return x; })` do? –  Jul 06 '16 at 18:54
  • this strategy produces an observable event stream of observables. The `selectMany` (or `flatMap`) expands the stream so that it becomes just an observable stream of results (strings in this case) by unwrapping the inner observable. – pjs Jul 06 '16 at 23:38
4

I was still a little confused after trying the accepted answer, so this is what ended up working for me. This is what I had:

  Rx.Observable.fromEvent(emitter, events.SUBMIT_EVENT)
      .flatMapFirst(email=>{
        return $.ajax({
          method: "POST",
          url: '/email',
          data: {
            email: email
          },
        }).promise()
      })
      .subscribe(response=>{
        if (response) {
          //do your success thing here
            }
          },
         error =>{
           //do your error thing
          }
         )

When the server returned an error (like when the user already entered their email) I wasn't able to listen for the user's email form submit again. This is what worked for me:

  Rx.Observable.fromEvent(emitter, events.SUBMIT_EVENT)
      .flatMapFirst(email=>{
        return $.ajax({
          method: "POST",
          url: '/email',
          data: {
            email: email
          },
        }).promise()
      })
      .doOnError(error=> {
        //do your error thing here
      })
      .retry()
      .subscribe(response=>{
        if (response) {
          //do your success thing here
        }
      })
taylorstine
  • 905
  • 7
  • 17