41

I am moving from the Promise world to the Observable world. One thing I usually do with Promise is to chain a series of tasks and make them run in sequence. For example, I have three tasks: printLog1() to print 1 to the console, printLog23() to print 2 and 3 to the console, and printLog4() to print 4.

When I want to print 1-2-3-4, I would write a promise chain like

printLog1()
  .then(() => {
    printLog23();
  })
  .then(() => {
    printLog4();
  });

Now I want the same functionality with Observable and I can rewrite the printLog() function into an Observable like

printLog1 = Rx.Observabale.of(1).map((i) => console.log(i));
printLog23 = Rx.Observabale.of(2, 3).map((i) => console.log(i));
printLog4 = Rx.Observabale.of(4).map((i) => console.log(i));

Then I have three observables that emits different values to the console. How do I chain them so that these three observables would run in order and print 1-2-3-4?

lusc
  • 1,206
  • 1
  • 10
  • 19
Haoliang Yu
  • 2,987
  • 7
  • 22
  • 28

2 Answers2

61

If you want to be sure the order of emissions is the same as the order in which you specified the source Observables you can use concat or concatMap operators.

The concat* operators subscribe to an Observable only after the previous Observable completes (it works with Promises as well, see http://reactivex.io/rxjs/class/es6/MiscJSDoc.js~ObservableInputDoc.html).

In you case it'd look like the following:

import { concat } from 'rxjs'; // Note, concat from 'rxjs', is not the same as concat from 'rxjs/operators'

concat(printLog1, printLog23, printLog4);

... or with concatMap if the request for one Promise depends on the response from the previous Promise:

printLog1.pipe(
  concatMap(response => ...),
  concatMap(response => ...),
);

... or when the order doesn't matter you can use merge that subscribes to all Observables/Promises immediately and reemits their results as they arrive:

merge(printLog1, printLog23, printLog4);

Jan 2019: Updated for RxJS 6

lusc
  • 1,206
  • 1
  • 10
  • 19
martin
  • 93,354
  • 25
  • 191
  • 226
  • 1
    I tested this method along lines of `const $printLog1 = from(someFunc1Async())` and `const $printLog2 = from(someFunc2Async())` and then tried to use concat unsuccessfully. The assignment above with `from` cause invocation immediately.. Maybe obvious but hopefully helps someone else. – ttugates Aug 28 '18 at 20:52
  • Hey there! Thanks! I did `merge` on 2 observables and I got both responses, but I don't know which one is for which one. Is there any way? – Ardeshir Izadi Dec 23 '18 at 09:12
  • I don't think the `concatMap` in the pipe in this example is strictly necessary - could have used a `mergeMap` for the same result. Each time `printLog1` is called it's returning a new observable, and each subsequent mapping operator will be a fresh call - so there'll be no inner observables to cancel. It would make sense to use `concatMap` if the maps were in the `pipe` of a subject - and `printLogSubject.next()` was being called to initiate the sequence – Drenai Jun 12 '19 at 20:41
1

My solution:

const sequence: Observable<any>[] = [of(1), of(2), of(3)];

return concat(...sequence).pipe(toArray());
lusc
  • 1,206
  • 1
  • 10
  • 19
Alex
  • 41
  • 2