67

I have a list of items to parse, but the parsing of one of them can fail.

What is the "Rx-Way" to catch error but continue executing the sequence

Code Sample:

var observable = Rx.Observable.from([0,1,2,3,4,5])
.map(
  function(value){
      if(value == 3){
        throw new Error("Value cannot be 3");
      }
    return value;
  });

observable.subscribe(
  function(value){
  console.log("onNext " + value);
  },
  function(error){
    console.log("Error: " + error.message);
  },
  function(){
    console.log("Completed!");
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>

What I want to do in a non-Rx-Way:

var items = [0,1,2,3,4,5];

for (var item in items){
  try{
    if(item == 3){
      throw new Error("Value cannot be 3");
    }
    console.log(item);
  }catch(error){
     console.log("Error: " + error.message);
  }
}
starball
  • 20,030
  • 7
  • 43
  • 238
Cheborra
  • 2,627
  • 4
  • 23
  • 39

5 Answers5

58

I would suggest that you use flatMap (now mergeMap in rxjs version 5) instead, which will let you collapse errors if you don't care about them. Effectively, you will create an inner Observable that can be swallowed if an error occurs. The advantage of this approach is that you can chain together operators and if an error occurs anywhere in the pipeline it will automatically get forwarded to the catch block.

const {from, iif, throwError, of, EMPTY} = rxjs;
const {map, flatMap, catchError} = rxjs.operators;

// A helper method to let us create arbitrary operators
const {pipe} = rxjs;

// Create an operator that will catch and squash errors
// This returns a function of the shape of Observable<T> => Observable<R>
const mapAndContinueOnError = pipe(
  //This will get skipped if upstream throws an error
  map(v => v * 2),
  catchError(err => {
    console.log("Caught Error, continuing")
    //Return an empty Observable which gets collapsed in the output
    return EMPTY;
  })
)

const observable = from([0, 1, 2, 3, 4, 5]).pipe(
  flatMap((value) => 
    iif(() => value != 3, 
      of(value), 
      throwError(new Error("Value cannot be 3"))
    ).pipe(mapAndContinueOnError)
  )
);

observable.subscribe(
  (value) => console.log("onNext " + value), (error) => console.log("Error: " + error.message), () => console.log("Completed!")
);
<script src="https://unpkg.com/rxjs@7.0.0/dist/bundles/rxjs.umd.min.js"></script>
paulpdaniels
  • 18,395
  • 2
  • 51
  • 55
  • what if an error occurs somewhere on a chain after the `flatMap` ? – nonybrighto Nov 17 '17 at 07:31
  • @nonybrighto Then it would kill the stream and emit an error to the subscriber, unless there was another catch somewhere down stream. – paulpdaniels Nov 17 '17 at 07:35
  • 3
    This does not seem to work in RxJS v6. I tried migrating the code and the observable completes after the error is caught. Would you mind updating your solution. I have created a stackblitz here for reference. [Stackblitz reproducable code.](https://stackblitz.com/edit/completes-on-error) – Manish Shrestha Nov 06 '19 at 09:24
  • @ManishShrestha There you go! – paulpdaniels Nov 07 '19 at 02:42
  • Love this answer for - the naked `pipe` function, the working rxJs example and the `const` destructuring of the imports. Every day's a school day! - oh and for the `iif`! – El Ronnoco Nov 27 '20 at 09:29
  • If you have additional operations in the pipe after `catchError`, it's important to return a non-empty observable for them to be executed. – Cedric Reichenbach Feb 16 '23 at 20:46
33

You need to switch to a new disposable stream, and if an error occurs within it will be disposed safely, and keep the original stream alive:

Rx.Observable.from([0,1,2,3,4,5])
    .switchMap(value => {

        // This is the disposable stream!
        // Errors can safely occur in here without killing the original stream

        return Rx.Observable.of(value)
            .map(value => {
                if (value === 3) {
                    throw new Error('Value cannot be 3');
                }
                return value;
            })
            .catch(error => {
                // You can do some fancy stuff here with errors if you like
                // Below we are just returning the error object to the outer stream
                return Rx.Observable.of(error);
            });

    })
    .map(value => {
        if (value instanceof Error) {
            // Maybe do some error handling here
            return `Error: ${value.message}`;
        }
        return value;
    })
    .subscribe(
      (x => console.log('Success', x)),
      (x => console.log('Error', x)),
      (() => console.log('Complete'))
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script>

More info on this technique in this blog post: The Quest for Meatballs: Continue RxJS Streams When Errors Occur

iamturns
  • 430
  • 5
  • 9
  • 1
    How would that look with the lettable and pipe operators ? – Royi Namir Feb 20 '18 at 10:19
  • 2
    BE CAREFUL, if the inner observable takes some time to emit it may never emit when using `switchMap()` as it cancels the previous observable. To test it yourself replace `Rx.Observable.of(value)` of this answer with `Rx.Observable.from(new Promise((resolve, reject) => setTimeout(() => resolve(i))))`. The result will only be: "Success 5, Complete". Replacing the `switchMap()` with `mergeMap()` fixes the problem and gives the expected results normally. – Mouneer Dec 02 '18 at 15:28
23

To keep your endlessObservable$ from dying you can put your failingObservable$ in a higher-order mapping operator (e.g. switchMap, concatMap, exhaustMap...) and swallow the error there by terminating the inner stream with an EMPTY observable returning no values.

Using RxJS 6:

endlessObservable$.pipe(
    switchMap(() => failingObservable$.pipe(catchError(error => EMPTY)))
);
Vedran
  • 10,369
  • 5
  • 50
  • 57
  • 1
    Trying to keep it as simple as possible for people looking for a working solution. – Vedran Nov 01 '19 at 14:56
  • 1
    a more illustrative example of this "double wrapping" at https://www.learnrxjs.io/operators/error_handling/catch.html (example no. 3) or https://stackblitz.com/edit/rxjs-catcherror-withmapoperators – Balage Jan 08 '20 at 15:07
  • 1
    This will not cancel unfinished failingObservable$ when endlessObservable$ emits? – isevcik Feb 25 '20 at 12:51
  • @isevcik yes it will - it's just an example; you can use any of the other higher-order mapping operators depending on behavior you need. – Vedran Feb 26 '20 at 10:15
  • 1
    `emtpy()` is now deprecated, `EMPTY` should be used instead. – JSON Derulo Jun 14 '21 at 10:19
  • Awsome!!!!!!!!!! You saved my day! – Nano Feb 17 '23 at 15:46
0

In a case where you don't want or can't access the inner observable that causes the error, you can do something like this :

Using RxJS 7 :

const numbers$ = new Subject();

numbers$
  .pipe(
    tap(value => {
      if (value === 3) {
        throw new Error('Value cannot be 3');
      }
    }),
    tap(value => {
      console.log('Value:', value);
    }),
    catchError((err, caught) => {
      console.log('Error:', err.message);
      return caught;
    })
  )
  .subscribe();

for (let n = 1; n <= 10; n++) {
  numbers$.next(n);
}

What is interesting here is the "caught" argument in the catchError operator that can be returned. https://rxjs.dev/api/operators/catchError

It only works when the source observable is Hot.

In my case, I use redux-observable and I wanted a way to handle my errors in a single place.

I came up with this :

const allEpics = combineEpics(
    // all my epics
);

export const rootEpic = (action$, state$, dependencies) => {
  return allEpics(action$, state$, dependencies).pipe(
    catchError((err, caught) => {
        if (err instanceof MyError) {
        return concat(of(displayAlert(err.message)), caught);
      }
      throw err;
    })
  );
};

If any of my epic throw a "MyError" exception, It will be caught and another action will be dispatched.

knona
  • 155
  • 2
  • 8
-5

You can actually use try/catch inside your map function to handle the error. Here is the code snippet

var source = Rx.Observable.from([0, 1, 2, 3, 4, 5])
    .map(
        function(value) {
            try {
                if (value === 3) {
                    throw new Error("Value cannot be 3");
                }
                return value;

            } catch (error) {
                console.log('I caught an error');
                return undefined;
            }
        })
    .filter(function(x) {
        return x !== undefined; });


source.subscribe(
    function(value) {
        console.log("onNext " + value);
    },
    function(error) {
        console.log("Error: " + error.message);
    },
    function() {
        console.log("Completed!");
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
Toan Nguyen
  • 11,263
  • 5
  • 43
  • 59