91

I'm pushing observables into an array like such...

var tasks$ = [];
tasks$.push(Observable.timer(1000));
tasks$.push(Observable.timer(3000));
tasks$.push(Observable.timer(10000));

I want an Observable that emits when all tasks$ have completed. Keep in mind, in practice, tasks$ doesn't have a known number of Observables.

I've tried Observable.zip(tasks$).subscribe() but this seems to fail in the event that there is only 1 task, and is leading me to believe that ZIP requires an even number of elements in order to work the way I would expect.

I've tried Observable.concat(tasks$).subscribe() but the result of the concat operator just seems to be an array of observables... e.g. basically the same as the input. You can't even call subscribe on it.

In C# this would be akin to Task.WhenAll(). In ES6 promise it would be akin to Promise.all().

I've come across a number of SO questions but they all seem to deal with waiting on a known number of streams (e.g. mapping them together).

Saleh Mahmood
  • 1,823
  • 1
  • 22
  • 30
josh-sachs
  • 1,749
  • 2
  • 15
  • 19
  • 2
    This depends on what you want to do when any of the Observables send an error notification. Do you want to just ignore the error or does it mean the entire result will be discarded and you'll receive just the error. – martin Jan 19 '17 at 07:30

4 Answers4

133

If you want to compose an observable that emits when all of the source observables complete, you can use forkJoin:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/first';

var tasks$ = [];
tasks$.push(Observable.timer(1000).first());
tasks$.push(Observable.timer(3000).first());
tasks$.push(Observable.timer(10000).first());
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });
Sнаđошƒаӽ
  • 16,753
  • 12
  • 73
  • 90
cartant
  • 57,105
  • 17
  • 163
  • 197
  • 16
    Be careful if `tasks` is built dynamically and is empty, forkJoin will stop the observable sequence. See my answer here for more info http://stackoverflow.com/a/42622968/1224564 – bgondy Mar 06 '17 at 10:29
  • O of the third observable was not in capital as declared, so it resulted observable is undefined. Also, tasks is missing the ending dollar sign before the subscribe. I cannot edit the answer because: Edits must be at least 6 characters; is there something else to improve in this post? – Tom Sawyer Mar 17 '17 at 13:17
  • I do not want to nag you forever, but I think the result will not be displayed because the flow of observable are not complete. May be just adding .first() after the timer call or .take(n) to have a more interesting result. forkJoin was the operator that I was seeking, thank you @cartant! – Tom Sawyer Mar 17 '17 at 13:25
  • Fixed the typos and added the `first` operators. BTW, if you need to propose an edit with only a single character changed, you can add a comment to make up the required number of characters: `` – cartant Mar 17 '17 at 21:16
  • I want a vote on whether the naming of `tasks$` is correct by convention. It's not really an observable itself so I'd vote no. – Simon_Weaver Sep 09 '18 at 02:28
  • 1
    @Simon That's one of the reasons I'm not a huge fan of Finnish notation - although, I do use it in some places. `task$s` is arguably more appropriate, but just looks super weird. – cartant Sep 09 '18 at 02:47
  • Haha it sure does. – Simon_Weaver Sep 09 '18 at 04:13
  • 3
    For rxjs 6: `import {forkJoin} from 'rxjs';` and `forkJoin(...tasks$).subscribe(results => { console.log(results); });`. – Sander Vanden Hautte Jan 11 '19 at 10:07
  • This needs a link to the rxjs doc of forkJoin – FirstVertex Nov 06 '19 at 19:35
  • Correct me if I'm wrong, but I think forkJoin will not wait for other observables to complete if there is one error. It will execute the error handler after the first error. – Koja Apr 07 '20 at 11:50
  • 1
    In the latest version of rxjs, you need to use `forkJoin(tasks$)`, not `forkJoin(...tasks$)` – Darrow Hartman Jun 11 '21 at 00:29
22

You can make usage of zip.

Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.

const obsvA = this._service.getObjA();
const obsvB = this._service.getObjB();
// or with array
// const obsvArray = [obsvA, obsvB];

const zip = Observable.zip(obsvA, obsvB);
// or with array
// const zip = Observable.zip(...obsvArray);
zip.subscribe(
  result => console.log(result), // result is an array with the responses [respA, respB]
);

Things to consider:

  • Doesn't need to be an even number of observables.
  • zip visually
  • enter image description here As said here,

    The zip operator will subscribe to all inner observables, waiting for each to emit a value. Once this occurs, all values with the corresponding index will be emitted. This will continue until at least one inner observable completes.

  • When one of the observables throws an error (or even both of them), the subscription is closed (onComplete on complete is called), and with a onError method, you only get the first error.
  • zip.subscribe(
      result => console.log(result), // result is an array with the responses [respA, respB]
      error => console.log(error), // will return the error message of the first observable that throws error and then finish it
      () => console.log ('completed after first error or if first observable finishes)
    );
    
    João Ghignatti
    • 2,281
    • 1
    • 13
    • 25
    • 2
      Note: with rxjs6 you can use `zip` in a pipe but you must use `zipAll`. If you do use zip you'll get deprecation warnings and the correct typing won't get passed through. You can however use zip 'statically with just `zip(...)` (no longer `Observable.zip(...)`) – Simon_Weaver Sep 09 '18 at 02:52
    4
    // waits for all Observables no matter of success/fails each of them
    // returns array of items
    // each item represent even first value of Observable or it's error
    export function waitAll(args: Observable<any>[]): Observable<any[]> {
      const final = new Subject<any[]>();
      const flags = new Array(args.length);
      const result = new Array(args.length);
      let total = args.length;
      for (let i = 0; i < args.length; i++) {
        flags[i] = false;
        args[i].subscribe(
          res => {
            console.info('waitAll ' + i + ' ok ', res);
            if (flags[i] === false) {
              flags[i] = true;
              result[i] = res;
              total--;
              if (total < 1) {
                final.next(result);
              }
            }
          },
          error => {
            console.error('waitAll ' + i + ' failed ', error);
            if (flags[i] === false) {
              flags[i] = true;
              result[i] = error;
              total--;
              if (total < 1) {
                final.next(result);
              }
            }
          }
        );
      }
      return final.asObservable();
    }
    

    unit test:

    describe('waitAll', () => {
      it('should wait for all observables', async () => {
        const o1 = new Subject();
        const o2 = new Subject();
        const o3 = new Subject();
    
        const o = waitAll([o1, o2, o3]);
        const res = {arr: []};
        o.subscribe(result => res.arr = result, err => res.arr = []);
    
        expect(res.arr).toEqual([]);
        o1.next('success1');
        expect(res.arr).toEqual([]);
        o2.error('failed2')
        expect(res.arr).toEqual([]);
        o3.next('success3')
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
    
        o1.next('success1*');
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
        o2.error('failed2*')
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
        o3.next('success3*')
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
      });
    });
    
    Sergey Gurin
    • 1,537
    • 15
    • 14
    • I feel like this is the only answer and it works great. It's the only version that doesn't kill itself after one call has failed. – austinthedeveloper Oct 01 '19 at 21:47
    • 1
      This is definitely the closest, but if inner observables are cancelled (unsubscribed using `takeUntil` for example), then the outer/final observable never completes. – Huon Aug 12 '20 at 00:35
    0

    For me this sample was best solution.

    const source = Observable.interval(500);
    const example = source.sample(Observable.interval(2000));
    const subscribe = example.subscribe(val => console.log('sample', val));
    

    So.. only when second (example) emit - you will see last emited value of first (source).

    In my task, I wait form validation and other DOM event.

    • These will execute in series, one after the other. You're not going to get the simultaneous execution that is the primary reason to use `zip` or `forkJoin` – Tony Brasunas Jul 13 '20 at 19:29