19

I'm writing an app in Angular 2 and I want to execute several http requests and run a function on the responses.

In Angular 1, I would write something like $q.all([$http.get(...), $http.get(...), ...]).then(doSomethingWithResponses);

But Angular 2 returns RxJS Observables and after a bunch of reading I still can't figure out how to get the responses of several http requests. How can this can be done?

BuZZ-dEE
  • 6,075
  • 12
  • 66
  • 96
benshope
  • 2,936
  • 4
  • 27
  • 39

4 Answers4

36

As @Eric Martinez pointed out, there is forkJoin. forkJoin runs all observable sequences in parallel and collect their last elements.

Rx.Observable.forkJoin([a,b]).subscribe(t=> {
        var firstResult = t[0];
        var secondResult = t[1];
});
Michael Kang
  • 52,003
  • 16
  • 103
  • 135
  • 3
    @EricMartinez is there a way for an index in the resulting array to "dissolve" if Observable a or Observable b fail (or are canceled or have an error or don't meet a condition)? So for example, if the Observable b fails for data1, then the length of the resulting array would be 1 instead accounting for the 2 calls that were made inside the Observable.forkJoin( ... ) argument block? – Benjamin McFerren Sep 23 '16 at 05:24
  • Written a RxJs v6 solution here using zip - https://stackoverflow.com/a/55993246/1882064 – arcseldon May 05 '19 at 14:57
7

I'm not sure you'd want to use forkJoin/zip, especially considering combineLatest is easier to understand and will emit on every sub-stream event, whereas forkJoin basically samples on every sub-stream having emitted.

This might come to bite you later when you want to combine multi-item Observables down the road.

kakigoori
  • 1,218
  • 10
  • 20
  • 2
    The difference is that you don't want to be notified of every emit on every stream; you only care about when all streams have emitted data. Think of it as the observable equivalent to `Promise.all(promises)` or `$q.all(promises)`. – GFoley83 Feb 12 '17 at 23:04
2

Wouldn't a merge work? You can subscribe and attach a handler to the onComplete callback.

I first build an array of my observables and then use static merge:

let obs_ary: any = [obs1, obs2, obs3];
Observable.merge(...obs_ary);
flyer88
  • 1,073
  • 3
  • 15
  • 33
1

I'm learning RxJS and I was trying to do the same thing with RxJS v5

It seems like we don't have forkJoin on v5 anymore, so here's how I got it working (works with flatMap or mergeMap, which are aliases):

const callOne = value =>
    new window.Promise(resolve =>
        setTimeout(() => resolve(value + 10), 3000)
    );

const callTwo = value =>
    new window.Promise(resolve =>
        setTimeout(() => resolve(value + 20), 1000)
    );

Rx.Observable
    .of(2)
    .do(() => console.log('querying...'))
    .mergeMap(number =>
        Rx.Observable.zip(
            Rx.Observable.fromPromise(callOne(number)),
            Rx.Observable.fromPromise(callTwo(number))
        )
    ).concatAll()
    .subscribe(createSubscriber('promises in parallel'));
rafaelbiten
  • 6,074
  • 2
  • 31
  • 36
  • I'm looking through the changelog of rxjs and I don't see anything about forkJoin being deprecated https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md – benshope Aug 09 '16 at 08:44
  • @benshope did you get it to run on v5? I get an error when trying to use forkJoin and I don't see it under src/operator. – rafaelbiten Aug 09 '16 at 12:24
  • I didn't try running it, but it looks like v5 is still in beta. Perhaps that could have caused the issue. Happy to set this answer as correct if forkJoin really is gone – benshope Aug 09 '16 at 16:38
  • Yes, it's in Beta but at least for now the forkJoin operator is gone. I really just posted this answer here cause I thought it would be helpful for those trying this on the latest version of RxJS. – rafaelbiten Aug 09 '16 at 16:46
  • Cool, yea! Maybe it was moved? I'm idly curious – benshope Aug 09 '16 at 17:17
  • Strange cause I can see it in the docs and searching the repo, I can still find references to it, so now I'm confused =/ – rafaelbiten Aug 09 '16 at 18:11
  • 1
    forkJoin is static so it lives under src/observable not src/operator. – Chris Oct 14 '16 at 20:59