5

I am doing some parallel HTTP get with RxJs pipe and the mergeMap operator.

On the first request fail (let's imagine /urlnotexists throw a 404 error) it stops all other requests.

I want it to continue query all remaining urls without calling all remaining mergeMap for this failed request.

I tried to play with throwError, and catchError from RxJs but without success.

index.js

const { from } = require('rxjs');
const { mergeMap, scan } = require('rxjs/operators');

const request = {
  get: url => {
    return new Promise((resolve, reject) => {
      setTimeout(() => {
        if (url === '/urlnotexists') { return reject(new Error(url)); }
        return resolve(url);
      }, 1000);
    });
  }
};

(async function() {
  await from([
    '/urlexists',
    '/urlnotexists',
    '/urlexists2',
    '/urlexists3',
  ])
    .pipe(
      mergeMap(async url => {
        try {
          console.log('mergeMap 1:', url);
          const val = await request.get(url);
          return val;
        } catch(err) {
          console.log('err:', err.message);
          // a throw here prevent all remaining request.get() to be tried
        }
      }),
      mergeMap(async val => {
        // should not pass here if previous request.get() failed 
        console.log('mergeMap 2:', val);
        return val;
      }),
      scan((acc, val) => {
        // should not pass here if previous request.get() failed 
        acc.push(val);
        return acc;
      }, []),
    )
    .toPromise()
    .then(merged => {
      // should have merged /urlexists, /urlexists2 and /urlexists3
      // even if /urlnotexists failed
      console.log('merged:', merged);
    })
    .catch(err => {
      console.log('catched err:', err);
    });
})();
$ node index.js
mergeMap 1: /urlexists
mergeMap 1: /urlnotexists
mergeMap 1: /urlexists2
mergeMap 1: /urlexists3
err: /urlnotexists
mergeMap 2: /urlexists
mergeMap 2: undefined <- I didn't wanted this mergeMap to have been called
mergeMap 2: /urlexists2
mergeMap 2: /urlexists3
merged: [ '/urlexists', undefined, '/urlexists2', '/urlexists3' ]

I expect to make concurrent GET requests and reduce their respectives values in one object at the end.

But if some error occurs I want them not to interrupt my pipe, but to log them.

Any advice ?

sklts
  • 77
  • 1
  • 9
  • Any particular reason you are wanting to introduce RxJS into what looks like a pure Promise problem? – Brandon Oct 07 '19 at 19:12

2 Answers2

0

If you are willing to forego RXJS and just solve with async/await it is very straightforward:

const urls = ['/urlexists', '/urlnotexists', '/urlexists2', '/urlexists3'];
const promises = urls.map(url => request(url));
const resolved = await Promise.allSettled(promises);

// print out errors
resolved.forEach((r, i) => {
  if (r.status === 'rejected') {
    console.log(`${urls[i]} failed: ${r.reason}`)
  }
});

// get the success results
const merged = resolved.filter(r => r.status === 'resolved').map(r => r.value);
console.log('merged', merged);

This make use of Promise.allSettled proposed helper method. If your environment does not have this method, you can implement it as shown in this answer.

hlovdal
  • 26,565
  • 10
  • 94
  • 165
Brandon
  • 38,310
  • 8
  • 82
  • 87
0

If you want to use RxJS you should add error handling with catchError and any additional tasks to a single request before you execute all your requests concurrently with forkJoin.

const { of, from, forkJoin } = rxjs;
const { catchError, tap } = rxjs.operators;

// your promise factory, unchanged (just shorter)
const request = {
  get: url => {
    return new Promise((resolve, reject) => setTimeout(
      () => url === '/urlnotexists' ? reject(new Error(url)) : resolve(url), 1000
    ));
  }
};

// a single rxjs request with error handling
const fetch$ = url => {
  console.log('before:', url);
  return from(request.get(url)).pipe(
    // add any additional operator that should be executed for each request here
    tap(val => console.log('after:', val)),
    catchError(error => {
      console.log('err:', error.message);
      return of(undefined);
    })
  );
};

// concurrently executed rxjs requests
forkJoin(["/urlexists", "/urlnotexists", "/urlexists2", "/urlexists3"].map(fetch$))
  .subscribe(merged => console.log("merged:", merged));
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>
frido
  • 13,065
  • 5
  • 42
  • 56