6

I'm facing a problem, and I've been trying to find a solution using RxJs, but can't seem to find one that fits it...

  • I have 3 different REST requests, that will be called sequentially, and each of them needs the response of the previous one as an argument
  • I want to implement a progress bar, which increments as the requests succeed

Here is what I thought :

  • I am going to use pipes and concatMap() to avoid nested subscriptions and subscribe to each request when the previous one is done.

Consider this very simplified version. Assume that each of represents a whole REST successful request (will handle errors later), and that I will do unshown work with the n parameter...

const request1 = of('success 1').pipe(
  delay(500),
  tap(n => console.log('received ' + n)),
);

const request2 = (n) => of('success 2').pipe(
  delay(1000),
  tap(n => console.log('received ' + n))
);

const request3 = (n) => of('success 3').pipe(
  delay(400),
  tap(n => console.log('received ' + n))
);

request1.pipe(
  concatMap(n => request2(n).pipe(
    concatMap(n => request3(n))
  ))
)

However, when I subscribe to the last piece of code, I will only get the response of the last request, which is expected as the pipe resolves to that.

So with concatMap(), I can chain my dependent REST calls correctly, but can't follow the progress.

Though I could follow the progress quite easily with nested subscriptions, but I am trying hard to avoid this and use the best practice way.

How can I chain my dependent REST calls, but still be able to do stuff each time a call succeeds ?

Bryan Lee
  • 150
  • 1
  • 8
  • Does this answer your question? [RXJS - multiple consecutive http requests](https://stackoverflow.com/questions/59570873/rxjs-multiple-consecutive-http-requests) – Tal Ohana Jan 03 '20 at 16:12

2 Answers2

4

This is a generalized solution, though not as simple. But it does make progress observable while still avoiding the share operator, which can introduce unexpected statefulness if used incorrectly.

const chainRequests = (firstRequestFn, ...otherRequestFns) => (
  initialParams
) => {
  return otherRequestFns.reduce(
    (chain, nextRequestFn) =>
      chain.pipe(op.concatMap((response) => nextRequestFn(response))),
    firstRequestFn(initialParams)
  );
};

chainRequests takes a variable number of functions and returns a function that accepts initial parameters and returns an observable that concatMaps the functions together as shown manually in the question. It does this by reducing each function into an accumulation value that happens to be an observable.

Remember, RxJS leads us out of callback hell if we know the path.

const chainRequestsWithProgress = (...requestFns) => (initialParams) =>  {
  const progress$ = new Rx.BehaviorSubject(0);
  const wrappedFns = requestFns.map((fn, i) => (...args) =>
    fn(...args).pipe(op.tap(() => progress$.next((i + 1) / requestFns.length)))
  );
  const chain$ = Rx.defer(() => {
    progress$.next(0);
    return chainRequests(...wrappedFns)(initialParams);
  });
  return [chain$, progress$];
};

chainRequestsWithProgress returns two observables - the one that eventually emits the last response, and one that emits progress values when the first observable is subscribed to. We do this by creating a BehaviorSubject to serve as our stream of progress values, and wrapping each of our request functions to return the same observable they normally would, but we also pipe it to tap so it can push a new progress value to the BehaviorSubject.

The progress is zeroed out upon each subscription to the first observable.

If you wanted to return a single observable that produced the progress state as well as the eventual result value, you could have chainRequestsWithProgress instead return:

chain$.pipe(
  op.startWith(null),
  op.combineLatest(progress$, (result, progress) => ({ result, progress }))
)

and you'll have an observable that emits an object representing the progress toward the eventual result, then that result itself. Food for thought - does progress$ have to emit just numbers?

Caveat

This assumes request observables emit exactly one value.

backtick
  • 2,685
  • 10
  • 18
  • 1
    Thank you for your answer. Though I ended up implementing the selected answer for the sake of simplicity (my coworkers are also RxJs beginners so the simplest the better). I searched for the different functions you used in the doc and still learned a lot of things. In particular I used a BehaviorSubject to let my component know about the progress in the service. – Bryan Lee Jan 06 '20 at 15:54
  • 1
    Great, I'm glad both answers were helpful to you! – backtick Jan 07 '20 at 04:28
3

The simplest solution would be to have a progress counter variable that is updated from a tap when each response comes back.

    let progressCounter = 0;

    request1.pipe(
      tap(_ => progressCounter = 0.33),
      concatMap(n => request2(n).pipe(
        tap(_ => progressCounter = 0.66),
        concatMap(n => request3(n)
          .pipe(tap(_ => progressCounter = 1)))
      ))
    );

If you want the progress itself to be observable then you want to share the request observables as to not make duplicate requests) and then combine them to get the progress.

An example of how you may want to approach that can be found at: https://www.learnrxjs.io/recipes/progressbar.html

The Observer
  • 170
  • 12