mergeMap
, like many other so-called higher order mapping operators, maintains one or multiple inner observables.
An inner observable is created with the outer value and the provided function. The outer value essentially is just the value received from its source. For example:
of(1, 2, 3).pipe(
mergeMap((outerValue, index) => /* ... return an observable ... */)
).subscribe(); // `outerValue`: 1, 2, 3 (separately)
When an outer value comes in, a new inner observable will be created. I think the best way to understand this is to have a look at the source code:
// `value` - the `outerValue`
protected _next(value: T): void {
if (this.active < this.concurrent) {
this._tryNext(value);
} else {
this.buffer.push(value);
}
}
protected _tryNext(value: T) {
let result: ObservableInput<R>;
const index = this.index++;
try {
// Create the inner observable based on the `outerValue` and the provided function (`this.project`)
// `mergeMap(project)`
result = this.project(value, index);
} catch (err) {
this.destination.error(err);
return;
}
this.active++;
// Subscribe to the inner observable
this._innerSub(result, value, index);
}
Please disregard for now concurrent
and buffer
, we'll have a look at them a bit later.
Now, what happens when an inner observable emits ? Before going any further, it's worth mentioning that, although it's obvious, an inner observable requires an inner subscriber. We can see this in the _innerSub
method from above:
private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
// This is where the subscription takes place
subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
}
When an inner observable emits, the notifyNext
method will be called:
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.destination.next(innerValue);
}
Where destination points to the next subscriber in the chain. For example, it can be this:
of(1)
.pipe(
mergeMap(/* ... */)
)
.subscribe({} /* <- this is the `destination` for `mergeMap` */)
This will be explained in more detail in What about the next subscriber in the chain below.
So, what does it mean to to mix 2 observables
?
Let's see this example:
of(2, 3, 1)
.pipe(
mergeMap(outerValue => timer(outerValue).pipe(mapTo(outerValue)))
)
.subscribe(console.log)
/* 1 \n 2 \n 3 */
When 2
arrives, mergeMap
will subscribe to an inner observable that will emit in 200
ms. This is an asynchronous action, but notice that the outer values(2, 3, 1) arrive synchronously. Next, 3
arrives and will create an inner obs. that will emit in 300
ms. Since the current script has not finished executing yet, the callback queue is not yet considered. Now 1
arrives, and will create an inner obs. that will emit in 100
ms.
mergeMap
has now 3 inner observables and will pass along the inner value of whichever inner observable emits.
As expected, we get 1
, 2
, 3
.
So that's what mergeMap
does. Mixing observables can be thought of this way: if an outer value comes and an inner observable has already been created, then mergeMap
simply says: "no problem, I'll just create a new inner obs. and subscribe to it".
What about concurrent
and buffer
mergeMap
can be given a second argument, concurrent
which indicates how many inner observables should handle at the same time. These number of active inner observables is tracked with the active
property.
As seen in _next
method, if active >= concurrent
, the outerValues
will be added to a buffer
, which is a queue(FIFO
).
Then, when one active inner observable completes, mergeMap
will take the oldest value from the value and will create an inner observable out of it, using the provided function:
// Called when an inner observable completes
notifyComplete(innerSub: Subscription): void {
const buffer = this.buffer;
this.remove(innerSub);
this.active--;
if (buffer.length > 0) {
this._next(buffer.shift()!); // Create a new inner obs. with the oldest buffered value
} else if (this.active === 0 && this.hasCompleted) {
this.destination.complete();
}
}
With this in mind, concatMap(project)
is just mergeMap(project, 1)
.
So, if you have:
of(2, 3, 1)
.pipe(
mergeMap(outerValue => timer(outerValue * 100).pipe(mapTo(outerValue)), 1)
)
.subscribe(console.log)
this will be logged:
2 \n 3 \n 1
.
What about the next subscriber in the chain
Operators are functions that return another function which accepts an observable as their only parameter and return another observable.
When a stream is being subscribed to, each observable returned by an operator will have its own subscriber.
All these subscribers can be seen as a linked list. For example:
// S{n} -> Subscriber `n`, where `n` depends on the order in which the subscribers are created
of(/* ... */)
.pipe(
operatorA(), // S{4}
operatorB(), // S{3}
operatorC(), // S{2}
).subscribe({ /* ... */ }) // S{1}; the observer is converted into a `Subscriber`
S{n}
is the parent(destination) of S{n+1}
, meaning that S{1}
is the destination of S{2}
, S{2}
is the destination of S{3}
and so forth.
StackBlitz
Unexpected results
Compare these:
of(2, 1, 0)
.pipe(
mergeMap(v => timer(v * 100).pipe(mapTo(v)))
).subscribe(console.log)
// 0 1 2
of(2, 1, 0)
.pipe(
mergeMap(v => timer(v).pipe(mapTo(v)))
).subscribe(console.log)
// 1 0 2
As per MDN:
The specified amount of time (or the delay) is not the guaranteed time to execution, but rather the minimum time to execution. The callbacks you pass to these functions cannot run until the stack on the main thread is empty.
As a consequence, code like setTimeout(fn, 0) will execute as soon as the stack is empty, not immediately. If you execute code like setTimeout(fn, 0) but then immediately after run a loop that counts from 1 to 10 billion, your callback will be executed after a few seconds.
This section by MDN should clarify things as well.
I'd say this is environment-specific, rather than RxJs-specific.
In the second snippet, the delays are consecutive so that's why you're getting unexpected results. If you increase the delays just a bit, like: timer(v * 2)
, you should get the expected behavior.