131

Say I have an Observable, like so:

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });

Then, I have a second Observable:

var two = someOtherObservable.take(1);

Now, I want to subscribe() to two, but I want to make sure that one has completed before the two subscriber is fired.

What kind of buffering method can I use on two to make the second one wait for the first one to be completed?

I suppose I am looking to pause two until one is complete.

Alexander Abakumov
  • 13,617
  • 16
  • 88
  • 129
Stephen
  • 18,827
  • 9
  • 60
  • 98
  • 1
    I believe the answer to this is the .exhaustMap() method however I wouldn't pretend to know how to implement it - full description here: https://blog.angular-university.io/rxjs-higher-order-mapping/ – Peter Nixey Jul 15 '20 at 12:22

11 Answers11

71

A couple ways I can think of

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
paulpdaniels
  • 18,395
  • 2
  • 51
  • 55
  • 2
    I ended up using `pause` and `resume` instead of `publish` and `connect`, but example two is essentially the route I took. – Stephen May 29 '15 at 12:37
  • 3
    Will this method always make the first observable (`one`) resolve before the second (`two`) inside the subscribe()-function? – John Feb 18 '17 at 10:46
  • 1
    Why not use `Observable.forkJoin()`? See this link https://www.learnrxjs.io/operators/combination/forkjoin.html – mspasiuk Jun 06 '17 at 20:56
  • 23
    @mspasiuk per the OPs requirement, they only wanted the second to subscribe *after* the first had completed. `forkJoin` subscribes simultaneously. – paulpdaniels Jun 06 '17 at 21:24
  • @paulpdaniels A question for my understanding: In the first method wouldn't it be enough to call `.pipe(take(1))` on `concat(one, two)`? So the following way: `concat(one, two).pipe(take(1)).subscribe(function() { /* do something */});`? – Spray'n'Pray Jul 16 '21 at 08:00
  • 1
    @Spray'n'Pray No because that would complete the subscription after receiving the first value from `one` so it wouldn't even end up subscribing to `two` – paulpdaniels Jul 16 '21 at 11:53
25

If you want to make sure that the order of execution is retained you can use flatMap as the following example

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

The outcome would be:

"1"
"11"
"111"
"finished"
Nikos Tsokos
  • 3,226
  • 2
  • 34
  • 41
23

skipUntil() with last()

skipUntil : ignore emitted items until another observable has emitted

last: emit last value from a sequence (i.e. wait until it completes then emit)

Note that anything emitted from the observable passed to skipUntil will cancel the skipping, which is why we need to add last() - to wait for the stream to complete.

main$.skipUntil(sequence2$.pipe(last()))

Official: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


Possible issue: Note that last() by itself will error if nothing is emitted. The last() operator does have a default parameter but only when used in conjunction with a predicate. I think if this situation is a problem for you (if sequence2$ may complete without emitting) then one of these should work (currently untested):

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

Note that undefined is a valid item to be emitted, but could actually be any value. Also note that this is the pipe attached to sequence2$ and not the main$ pipe.

Community
  • 1
  • 1
Simon_Weaver
  • 140,023
  • 84
  • 646
  • 689
  • Very clumsy demo : https://angular-vgznak.stackblitz.io You need to click to open the console tray – Simon_Weaver Aug 29 '18 at 21:20
  • Your syntax is wrong. skipUntil can't be directly attached to an observable otherwise you'll get the following error: 'Property 'skipUntil' does not exist on type 'Observable'.' You need to first run it through .pipe() – London804 Aug 05 '20 at 23:25
  • Yes this is an old answer before pipe was required. Thanks for mentioning it. I’d update it now but I’m on my phone. Feel free to edit the answer. – Simon_Weaver Aug 05 '20 at 23:26
22

Here's a reusable way of doing it (it's typescript but you can adapt it to js):

function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) => signal.pipe(
        first(),
        switchMap(_ => source),
    );
}

and you can use it like any operator:

var two = someOtherObservable.pipe(waitFor(one), take(1));

It's basically an operator that defers the subscribe on the source observable until the signal observable emits the first event.

Andrei Tătar
  • 7,872
  • 19
  • 37
  • is there a rxswift version of this reusable function – sujith1406 Dec 18 '20 at 17:03
  • 1
    Note there is a slight semantic difference here with what the OP requested. In this case if you request `first`, and the signal `Observable` is empty then this will error. The OPs example uses `take` which would allow the source to be empty and still run the second one. – paulpdaniels Apr 15 '22 at 17:26
  • Although this might be slightly off topic for the OP - I found my problem simply to be that I wanted an observable that communicates with NGRX to wait for an observable that's doing a separate lookup to firestore (which is notably slower than NGRX). This wait for solved it for me without any performance or memory impact - good work. – KeaganFouche Feb 16 '23 at 10:49
14

Here is yet another possibility taking advantage of switchMap's result selector

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */ 
  });

Since the switchMap's result selector has been depreciated, here is an updated version

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
  take(1),
  switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
  /* do something */ 
});
Joseph King
  • 5,089
  • 1
  • 30
  • 37
6

If the second observable is hot, there is another way to do pause/resume:

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function () { 
  /* resume paused source2 */ 
  pauser.onNext(true);
}).subscribe(function(){
  // do something
});

source2.subscribe(function(){
  // start to recieve data 
});

Also you can use buffered version pausableBuffered to keep data during pause is on.

Anton
  • 2,535
  • 2
  • 25
  • 28
5

Here's a custom operator written with TypeScript that waits for a signal before emitting results:

export function waitFor<T>(
    signal$: Observable<any>
) {
    return (source$: Observable<T>) =>
        new Observable<T>(observer => {
            // combineLatest emits the first value only when
            // both source and signal emitted at least once
            combineLatest([
                source$,
                signal$.pipe(
                    first(),
                ),
            ])
                .subscribe(([v]) => observer.next(v));
        });
}

You can use it like this:

two.pipe(waitFor(one))
   .subscribe(value => ...);
Sergiu
  • 1,397
  • 1
  • 18
  • 33
2

Here's yet another, but I feel more straightforward and intuitive (or at least natural if you're used to Promises), approach. Basically, you create an Observable using Observable.create() to wrap one and two as a single Observable. This is very similar to how Promise.all() may work.

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      // observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
});

So, what's going on here? First, we create a new Observable. The function passed to Observable.create(), aptly named onSubscription, is passed the observer (built from the parameters you pass to subscribe()), which is similar to resolve and reject combined into a single object when creating a new Promise. This is how we make the magic work.

In onSubscription, we subscribe to the first Observable (in the example above, this was called one). How we handle next and error is up to you, but the default provided in my sample should be appropriate generally speaking. However, when we receive the complete event, which means one is now done, we can subscribe to the next Observable; thereby firing the second Observable after the first one is complete.

The example observer provided for the second Observable is fairly simple. Basically, second now acts like what you would expect two to act like in the OP. More specifically, second will emit the first and only the first value emitted by someOtherObservable (because of take(1)) and then complete, assuming there is no error.

Example

Here is a full, working example you can copy/paste if you want to see my example working in real life:

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
}).subscribe(
  function onNext(value) {
    console.log(value);
  },
  function onError(error) {
    console.error(error);
  },
  function onComplete() {
    console.log("Done!");
  }
);

If you watch the console, the above example will print:

1

6

Done!

Community
  • 1
  • 1
c1moore
  • 1,827
  • 17
  • 27
  • This was the breakthrough I needed to create my own custom 'cluster(T, X, D)' operator that processes only the first X emits within timespan T from the source and emits results spaced out by D delay. Thank you! – wonkim00 Aug 16 '18 at 19:49
  • I'm glad it helped, it was very enlightening when I realized this, too. – c1moore Aug 16 '18 at 21:51
1

well, I know this is pretty old but I think that what you might need is:

var one = someObservable.take(1);

var two = someOtherObservable.pipe(
  concatMap((twoRes) => one.pipe(mapTo(twoRes))),
  take(1)
).subscribe((twoRes) => {
   // one is completed and we get two's subscription.
})
itay oded
  • 978
  • 13
  • 22
0

You can use result emitted from previous Observable thanks to mergeMap (or his alias flatMap) operator like this:

 const one = Observable.of('https://api.github.com/users');
 const two = (c) => ajax(c);//ajax from Rxjs/dom library

 one.mergeMap(two).subscribe(c => console.log(c))
Tktorza
  • 21
  • 1
  • 1
    from here: https://www.learnrxjs.io/learn-rxjs/operators/transformation/mergemap - "If the order of emission and subscription of inner observables is important, try concatMap!" – gsziszi Sep 18 '20 at 12:30
0

Perhaps you can use the delayWhen operator.

We have two observables one$ and two$. First observable emits 1 after a 1s delay then completes. Second observable emits 2 only after one$ has emitted:

const one$ = of(1).pipe(
  delay(1000),
  tap(() => console.log('one$ emitted'))
);

const two$ = of(2).pipe(
  delayWhen(() => one$),
  tap(() => console.log('two$ emitted')),
);

two$.subscribe(n => {
  console.log(`n=${n}`);
});
<script src="https://unpkg.com/rxjs@7.5.5/dist/bundles/rxjs.umd.min.js"></script>
<script>
const {of} = rxjs;
const {delay, delayWhen, tap} = rxjs.operators;
</script>
customcommander
  • 17,580
  • 5
  • 58
  • 84