2

I am building an application using RxJS and one of the interesting problems I have come across is how I can implement an operator that will catch errors only when they are unhandled by the rest of the pipeline ahead of it. I'll call my operator catchUnhandledError for now.

The catchError operator is loosely similar (non stream) to

try {
    // stream  
} catch (err) {
    // catchError handler
    // new stream
}

What I am trying to implement resembles the following

try {
    // stream
} catch (err) {
    try {
        // original stream
    } catch (err2) {
        // catchUnhandledError handler
        // new stream
    }
}

The key takeaway here is that the new operator only catches errors that other operators do not catch further down the pipeline.

I appreciate that piped operators essentially wrap observables similar to how middleware works in popular application pipelines which means "pushing the pipe to the end" is nonsensical.

My stream (simplified) is created as follows.

combineLatest([ page, filter ]).pipe(
    switchMap(([ page, { search, order }]) =>
        apiQuery(search, order, page).pipe(
            map(/* combine response with filters */),
            catchError(/* ONLY handle http status 422 Unprocessable Entity for validation */),
            catchError(/* ONLY handle http status 409 Conflict for version conflicts *)
        )
    )
);

Where api.query returns:

function errorHandler(errorMessage?: string) {
    return <T>(source: Observable<T>): Observable<T> => 
        source.pipe(
            /* replace with new catchUnhandledError operator */
            catchError(() => {
                /* More complex code than this to log the error */
                alert('unhandled error');

                return EMPTY;
            })  
        );
}

function apiQuery(search: string, order: string, page: number) {
    /* uses parameters to generate ajax payload */
    return ajax({
        url: 'url', 
        method: 'GET'
    }).pipe(
        map(r => r.response as T),
        errorHandler('An error message')
    );
}

The problem is the validation / request specific error handlers never get called because the generic errorHandler takes precedence. A couple of work arounds I will have to use if this is not possible:

  • Pass everything in the pipe as a parameter / callback (convoluted)
  • Map success and errors to a single object then check .success (convoluted)
  • Copy and paste the generic errorHandler to every single place I call my api (duplicated)

Does anyone have any ideas that will prevent me from having to have convoluted or duplicated code?

Eliott Robson
  • 960
  • 9
  • 20
  • This is [hardly possible for promises](https://stackoverflow.com/a/57792542/1048572), I doubt the situation is different fro rxjs observables. – Bergi May 19 '20 at 10:50
  • To the contrary, that problem is relatively easy to solve with async await. https://stackblitz.com/edit/typescript-4xkpwh – Eliott Robson May 19 '20 at 11:06
  • No, that example function just *always* handles errors, regardless of whether they would also be handled down the chain. And in your example it is always used at the end of the chain, not in the middle. – Bergi May 19 '20 at 11:14

2 Answers2

1

For the unhandled observable error handling you can use:

import { config } from 'rxjs'

config.onUnhandledError = (err: any) => console.error(err)
  • Your answer could be improved with additional supporting information. Please [edit] to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Apr 10 '23 at 07:01
0

The first thing that comes to mind is to create a custom operator that will be given a callback function with which you can determine whether the catchError should handle this or should pass it along further in the stream.

const catchUnhandledError = (shouldHandle: (err: any) => boolean) => 
  (source: Observable<any>) => defer(
    () => source.pipe(
      catchError(err => shouldHandle(err) ? of('handled') : throwError(err) /* pass along the error */)
    )
  )
Andrei Gătej
  • 11,116
  • 1
  • 14
  • 31
  • I did think about that, and even started diving into the rxjs internals to recreate the `catchError` operator. However, with this implementation the observable will error out (what I am trying to avoid) if further handlers don't exist down the line. https://stackblitz.com/edit/rxjs-12kldr. The problem here is that my inner observable has the error handling at the beginning which is the exact same problem. – Eliott Robson May 19 '20 at 11:11
  • `observable will error out ... if further handlers don't exist down the line ` - so what should be expected behavior when this happens? – Andrei Gătej May 19 '20 at 11:15
  • The `catchUnhandledError` handler kicks in, basically if this is piped then the observable never errors at the very end BUT error handlers can detect the error. Think this is a limitation of the fact that streams exit immediately on error and `catchError` simply starts a new stream. – Eliott Robson May 19 '20 at 11:18
  • I'm not sure I fully understood what you're trying to achieve. These phrases confuse me: ` unhandled by the rest of the pipeline ahead of it` & `operators do not catch further down the pipeline`. From the former I understood: `pipe(errX(), errY(), err())` - `err` will handle if not handled in the prev one and from the latter: `pipe(err(), errX(), errY())` - `err` will handle if not handle in the next ones. Which case is it? – Andrei Gătej May 19 '20 at 11:35
  • `pipe(err(), errX(), errY())` `err` will handle if not handled in the next ones. But it feels like it's not possible :( – Eliott Robson May 19 '20 at 11:36
  • So you want an operator that can be placed anywhere in the pipeline and will handle the unhandled errors? – Andrei Gătej May 19 '20 at 11:38
  • Yeah exactly, although I think this is limited by the fact that caught streams are replaced in catchError so we’d be listening to a completely different observable – Eliott Robson May 19 '20 at 11:39
  • I don't see why it would be a limitation. If it's caught, then the custom err operator will have nothing to handle, right? Another idea that comes to mind is to wrap the entire stream so that at the end of the pipeline you attach a normal `catchError` and if that is reached, it means the error has not been handled yet. Or am I missing something? If so, could you elaborate on _why_ do you need this kind of operator? – Andrei Gătej May 19 '20 at 11:48
  • 1
    Yeah I will go this route where I add a catchError to the end. I was just hoping to centralise this logic where the observable was created and prevent duplicating. But I can extract the logic to a function and reuse the function to reduce the technical debt. – Eliott Robson May 19 '20 at 11:52