3

Consider the following snippet

const { NEVER, timer } = rxjs;
const { catchError, switchMap, timeout } = rxjs.operators;

timer(0, 3000).pipe(
  switchMap(() => 
    timer(randomIntFromInterval(1, 4) * 1000).pipe(  // <-- mock HTTP call
      catchError(() => {
        // do something on error
        console.log('Caught error');
        return NEVER;
      })
    )
  ),
).subscribe({
  next: (value) => console.log('Next triggred')
});

// credit: https://stackoverflow.com/a/7228322/6513921
function randomIntFromInterval(min, max) {
  const value = Math.floor(Math.random() * (max - min + 1) + min);
  console.log(`Simulated HTTP call ${value}s`);
  return value;
}
.as-console-wrapper { max-height: 100% !important; top: 0px }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.4.0/rxjs.umd.min.js"></script>

Here the catchError would only be triggered when the HTTP call emits an error. But if the HTTP call doesn't return anything within the 3 seconds poll timer, the previous request would be cancelled before the next call. I'd like to perform error-handling (essentially triggering the catchError operator) over these cancelled requests.

I'm aware we could pipe in a timeout with < 3s threshold to throw an error. But I'd like to handle it without using timeout operator.

Could anyone come up with a better solution? TIA.

ruth
  • 29,535
  • 4
  • 30
  • 57

3 Answers3

1

I think an approach would be to use a combination of race and share:

const src$ = timer(0, 3000).pipe(
  share(),
);

src$.pipe(
  // Making sure the subscriber created from `race` receives
  // the notification first, so that the error can be properly thrown.
  delay(0),

  switchMap(() => race(
      timer(randomIntFromInterval(1, 4) * 1000),
      src$.pipe(switchMapTo(throwError('Request took longer than 3 seconds!'))),
    ),
  ),
  catchError(() => {
    // do something on error
    console.log('Caught error');
    return NEVER;
  })
).subscribe({
  next: (value) => console.log('Next triggred')
});

Here is my train of thoughts:

By using share, I'm essentially telling RxJS to place a Subject instance(we will refer to it as S as of now) between timer and subsequent subscribers. This means that all the subscribers that would initially go to timer, will instead go to Subject. This is to ensure that there can be as many subscribers as possible, without recreating a new timer every time. Now, let's see why we had to consider multiple subscribers.

When the timer emits 0, for the first time, S will have a subscriber. Then, race comes into play:

race(
  timer(randomIntFromInterval(1, 4) * 1000),
  src$.pipe(switchMapTo(throwError('Request took longer than 3 seconds!'))),
),

These are the racers: the actual request and the next notification of the timer. Let's see the possible cases:

  • the request completes before 3 seconds pass; in this case, race will do its job and will unsubscribe from the loser, in this case the observable which would throw the custom error
  • the request does not complete before 3 seconds pass; in this case, the winner is src$.pipe(...), meaning that we will get an error which will be caught by the catchError operator from the main pipe; here is also important to highlight the usage of delay(0) - to ensure that the error is first thrown, and then the switchMap's inner observable is unsubscribed; in this situation, there are 2 subscribers that S will get - one is from the main pipe and the other as a result for using race, so by using delay(0), the first subscriber which receives the timer's notification is the race's subscriber
Andrei Gătej
  • 11,116
  • 1
  • 14
  • 31
1

I can suggest slightly different approach: instead of throwing an error you can just track such cases and apply the logic you need

Here's an operator to do this:

function switchMapWithOvertakeEvent<T, R>(
  project: (value: T, index: number) => ObservableInput<R>,
  onOvertake: (value: T) => void
): OperatorFunction<T, R> {
  let awaitingResponse = false;
  return (src$) =>
    src$.pipe(
      tap((v) => {
        if (awaitingResponse) {
          onOvertake(v);
        }
        awaitingResponse = true;
      }),
      switchMap(project),
      tap(() => (awaitingResponse = false))
    );
}

It can be used with your example as follows

timer(0, 3000)
  .pipe(
    switchMapWithOvertakeEvent(
      () =>
        timer(randomIntFromInterval(1, 10) * 1000).pipe(
          // <-- mock HTTP call
          catchError(() => {
            // do something on error
            console.log('Caught error');
            return NEVER;
          })
        ),
      () => console.log('http call cancelled')
    )
  )
  .subscribe({
    next: (value) => console.log('Next triggred'),
    complete: () => console.log('complete'),
  });

// credit: https://stackoverflow.com/a/7228322/6513921
function randomIntFromInterval(min, max) {
  const value = Math.floor(Math.random() * (max - min + 1) + min);
  console.log(`Simulated HTTP call ${value}s`);
  return value;
}

You can play with the code here https://stackblitz.com/edit/mjemgq?devtoolsheight=50

cuddlemeister
  • 1,586
  • 12
  • 15
  • 1
    I'm accepting this solution since it provides different callbacks for errors and cancels. Thanks. – ruth Dec 07 '21 at 09:10
0

How about this?

https://stackblitz.com/edit/rxjs-87avpe

function randomIntFromInterval(min, max) {
  const value = Math.floor(Math.random() * (max - min + 1) + min);
  console.log(`Simulated HTTP call ${value}s`);
  return value;
}

const api$ = defer(() => timer(randomIntFromInterval(1, 10) * 1000));
const timerTrigger$ = timer(3000);
api$.pipe(
    takeUntil(timerTrigger$),
    throwIfEmpty(),
    catchError(() => {
      // do something on error
      console.log('Caught error');
      return EMPTY;
    }),
    repeat() // just repeat api
).subscribe({
    next: (value) => console.log('Next triggred'),
    complete: () => console.log('complete'),
});
Eddy Lin
  • 513
  • 2
  • 6