5

I'd like to alternately combine elements of multiple streams:

var print = console.log.bind(console);

var s1 = Rx.Observable.fromArray([1, 1, 5]);
var s2 = Rx.Observable.fromArray([2, 9]);
var s3 = Rx.Observable.fromArray([3, 4, 6, 7, 8]);

alternate(s1, s2, s3).subscribe(print); // 1, 2, 3, 1, 9, 4, 5, 6, 7, 8

How looks the function definition of alternate?

maiermic
  • 4,764
  • 6
  • 38
  • 77

2 Answers2

5

Use zip and concatMap when working on observables that were created from arrays (as in your example), or zip and flatMap when working on observables that are inherently asynchronous.

Rx.Observable
  .zip(s1, s2, s3, function(x,y,z) { return [x,y,z]; })
  .concatMap(function (list) { return Rx.Observable.from(list); })
  .subscribe(print); // 1, 2, 3, 1, 9, 4

Notice that this doesn't proceed anymore once one of the source observables completes. That's because zip is strictly "balanced" and it waits until all the sources have a matching event. What you want is a somewhat loose version of zip when dealing with completed sources.

Henrik
  • 9,714
  • 5
  • 53
  • 87
André Staltz
  • 13,304
  • 9
  • 48
  • 58
3

If there is a value (for example undefined) that is not emitted by the source observables, this solution works:

var concat = Rx.Observable.concat;
var repeat = Rx.Observable.repeat;
var zipArray = Rx.Observable.zipArray;
var fromArray = Rx.Observable.fromArray;

var print = console.log.bind(console);

var s1 = fromArray([1, 1, 5]);
var s2 = fromArray([2, 9]);
var s3 = fromArray([3, 4, 6, 7, 8]);

alternate(s1, s2, s3).subscribe(print);

function alternate() {
    var sources = Array.slice(arguments).map(function(s) {
        return concat(s, repeat(undefined))
    });
    return zipArray(sources)
            .map(function(values) {
                return values.filter(function(x) {
                    return x !== undefined;
                });
            }).takeWhile(function(values) {
                return values.length > 0;
            }).concatMap(function (list) { return fromArray(list); })
}

Same example in ES6:

const {concat, repeat, zipArray, fromArray} = Rx.Observable;

var print = console.log.bind(console);

var s1 = fromArray([1, 1, 5]);
var s2 = fromArray([2, 9]);
var s3 = fromArray([3, 4, 6, 7, 8]);

alternate(s1, s2, s3).subscribe(print);

function alternate(...sources) {
    return zipArray(sources.map( (s) => concat(s, repeat(undefined)) ))
            .map((values) => values.filter( (x) => x !== undefined ))
            .takeWhile( (values) => values.length > 0)
            .concatMap( (list) => fromArray(list) )
}
maiermic
  • 4,764
  • 6
  • 38
  • 77