3

I just read this SO question RxJS Promise Composition (passing data) and had a question.

The setting is the same as the aforementioned question: I have 3 promises.

 const p1 = () => Promise.resolve(1);
 const p2 = x => { const val = x + 1; return Promise.resolve(val); };
 const p3 = x => {
      const isEven = x => x % 2 === 0;
      return Promise.resolve(isEven(x));
 };

And I'm chaining them, as per the answer, like this:

 var chainedPromises$ = 
     Rx.Observable.just()
             .flatMap(p1)
             .flatMap(p2)
             .flatMap(p3);

However, chainedPromises$ only fires up once, at the end of all the promise chain, with the resolved value true. Namely:

chainedPromises$.subscribe(function(x){console.log(x)}); // Logs `true`

How can I change chainedPromises$ so that it fires once at the end of each promise? Ie:

chainedPromises$.subscribe(function(x){console.log(x)});
// Logs `1`
// Logs `2`
// Logs `true`
jeanpaul62
  • 9,451
  • 13
  • 54
  • 94
  • Observable & Promises are not the same thing, what your trying to do here doesn't make sense. If your going to make an Oserable return multiple results, then you fire them off using `next` – Keith Apr 11 '18 at 15:28

3 Answers3

2

Looks like you need to emit the generated value and use it in the next step of the chain. I don't know if there's sugar for this, but I got it working by making this distinction explicit.

Observable
  .fromPromise(p1())
  .flatMap(val => Observable.merge(
    Observable.fromPromise(p2(val))
      .flatMap(val => Observable.merge(
        Observable.fromPromise(p3(val)),
        Observable.of(val)
      )),
    Observable.of(val)
  ))
  .subscribe(console.log);

You can see it working.

frontsideair
  • 528
  • 3
  • 7
1

Just don't use mergeMap that is made to "map" one value to another and use concat that subscribes to an Observable only if the previous Observable completed. This way you'll achieve the same effect where you process Observables one at a time but each emission will be propagated:

Observable.concat(p1, p2, p3)
  .subscribe(console.log);

Edit:

Observable.of(null)
  .expand((value, index) => {
    switch (index) {
      case 0:
        return p1();
      case 1:
        return p2(value);
      case 2:
        return p3(value);
      default:
        return Observable.empty();
    }
  })
  .filter((value, index) => index > 0) // Ignore the `null` value that was necessary to start the chain
  .subscribe(console.log);

This prints:

1
2
true

See live demo (open console): https://stackblitz.com/edit/rxjs5-fucqhq?file=index.ts

martin
  • 93,354
  • 25
  • 191
  • 226
1

For me you wasting the whole point of an observable here.

If you have a set of Promises you want a subscriber to receive as a stream you simply pass on to next, you now have a stream you can subscribe too.

Below is an example. ->

const p1 = () => Promise.resolve(1);
const p2 = x => { const val = x + 1; return  Promise.resolve(val); };
const p3 = x => {
  const isEven = x => x % 2 === 0;
  return Promise.resolve(isEven(x));
};

const chainedProm = (observer) => {
  let params;
  return async (p) => 
    observer.next(params = await p(params));
};

var observable = Rx.Observable.create(async function (observer) {
  const doProm = chainedProm(observer);
  await doProm(p1);
  await doProm(p2);
  await doProm(p3);
  observer.complete();
});


observable.subscribe(function(x){console.log(x)});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.9/Rx.min.js"></script>
Keith
  • 22,005
  • 2
  • 27
  • 44