The inner merged observable isn't terminating with this code (rxjs 5.5.6):
let source = new Subject<string[]>();
// when the source emits a vector of strings, output
// each string with a 1s delay
source.switchMap(v => Observable.from(v)
.map(s => Observable.of(s).delay(1000).do(s => console.log('do: ' + s)))
// only one active observable at time
.mergeAll(1)
).subscribe(val => console.log('result: ' + val));
// emit two vectors, 1.5s apart
Observable.interval(1500).take(2).map(i => ['a' + i, 'b' + i, 'c' + i])
.subscribe(v => source.next(v));
Output is:
do: a0
result: a0
do: b0
do: a1
result: a1
do: c0
do: b1
result: b1
do: c1
result: c1
The expected output is:
do: a0
result: a0
do: a1
result: a1
do: b1
result: b1
do: c1
result: c1
That is, after the second vector emits, the switchMap should unsubscribe from the observable on the first vector, canceling that observable. And while the unsubscribe is clearly working, the inner observable is still running, as evidence by the "do: a0 .. b0 .. c0" in the output from the first example.
And in fact the expected output is exactly what you get from this code:
let source =
Observable.interval(1500).take(2).map(i => ['a' + i, 'b' + i, 'c' + i]);
source.switchMap(v => Observable.from(v)
.map(s => Observable.of(s).delay(1000).do(s => console.log('do: ' + s)))
.mergeAll(1)
).subscribe(val => console.log('result: ' + val));
But why doesn't the first example behave the same way?