3

I have this code:

let $obs = Rx.Observable.merge(
    this.obsEnqueue,
    Rx.Observable.timer(2000)
);

but this isn't doing want I want - want I would like to do is wait until both events fire (an event from this.obsEnqueue and the event from Rx.Observable.timer()).

merge will most likely create a new Observable that will fire when the first event happens from all the observables are passed in. I would like to create a new observable that will fire after all observable passed in have fired.

So more generally:

let $obs = Rx.Observable.X(
    Rx.Observable,    // wait
    Rx.Observable,    // until
    Rx.Observable,    // all of these
    ...
    Rx.Observable     // fire the next event
);

How can I do this? What is X?

g t
  • 7,287
  • 7
  • 50
  • 85
Alexander Mills
  • 90,741
  • 139
  • 482
  • 817
  • [This question](http://stackoverflow.com/q/35357919/501250) is the same except for the Java version of RX. Perhaps you can adapt the answer for RXJS. – cdhowie Jan 03 '17 at 21:34
  • @cdhowie that question is slightly different - it says wait until all have *completed*...that is different then waiting until all have fired their next event? Right? – Alexander Mills Jan 03 '17 at 21:57

2 Answers2

4

You can use zip, if you want to combine all the results when they arrive:

const { Observable } = Rx;

const result = Observable.zip(
  Observable.of('data1'),
  Observable.of(true).delay(500),
  Observable.timer(1000)
);

result.forEach(console.log); // after 1000ms: ['data1', true, 0]
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
nem035
  • 34,790
  • 6
  • 87
  • 99
  • `zip` is useful when you want to combine sequences by waiting for every sequence to provide a value each time. `merge` on the other hand doesn't wait and just sends whichever value comes first from whichever sequence. – nem035 Jan 03 '17 at 22:01
  • thanks, in the comments to the OP there is linked question to RxJava that describes how to wait until all observables have completed, that seems different because zip just waits until all observables have fired their next event (?) *However*, the accepted answer in that question is zip also, which is confusing. Can you explain? – Alexander Mills Jan 03 '17 at 22:03
  • here is the Java question- http://stackoverflow.com/questions/35357919/combine-a-list-of-observables-and-wait-until-all-completed – Alexander Mills Jan 03 '17 at 22:04
  • 1
    `zip` can be used for either. The way `zip` works is it waits for all of its observables to provide a value and then it fires its `onNext`. If any of the observables passed into `zip` fire `onCompleted`, then `zip` does the same as well. Same thing with `onError`. You can think of `zip` as merging multiple arrays by taking each element from the top. If we reached the end of any of the arrays, we are done. – nem035 Jan 03 '17 at 22:10
  • hmmm, thanks, but WRT zip - what if you specifically wanted to wait for onCompleted() and you didn't want to fire for onNext()? You'd have to tell zip() that you wanted to do that right? – Alexander Mills Jan 03 '17 at 22:13
  • In my case, I want to fire onNext() for the outer observable when all the inner observables have fired onNext(), but I am just curious how it could be done with only onCompleted() and ignore onNext() – Alexander Mills Jan 03 '17 at 22:14
  • 1
    Well, you cannot really do that with zip, that's not its intended use-case, unless you give it observables that only call `onCompleted` and do not call `onNext`. If you lookup ways to do this I'm sure there are many examples online ([here's one](http://stackoverflow.com/questions/30519645/how-can-i-make-one-rxjs-observable-sequence-wait-for-another-to-complete-before)). Also, you can always [implement your own operator](https://github.com/ReactiveX/rxjs/blob/master/doc/operator-creation.md) for some custom behavior you might need. – nem035 Jan 03 '17 at 22:28
2

You could maybe use forkJoin:

Rx.Observable.forkJoin(
  Rx.Observable.of('hello').delay(2000),
  Rx.Observable.of('world').delay(1000)
)
  .subscribe(console.log)

After two seconds you should have an array of ['hello', 'world']. The order of the output will be the same order as the input of arguments.

Input can be multiple arguments, an array, and other things:

Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.

Here is a Fiddle to test it (just bring up the console).

Filuren
  • 661
  • 2
  • 7
  • 19