I want to create a queue where I always execute two httpRequests in parallel. When one request completes, the next should start so that always two run in parallel. I want to merge the results and return it as a whole, when all requests have finished.
My first attempt was to create a Subject (queue) where I pass the urls with queue.next(url). Immediately after theat I complete the subject. I use mergeMap to limit the execution to two requests at a time (queueObservable).
I Subscribe to the queueObservable and add the response to an array (list). Then I await the promise of the queueObservable, which should resolve when the observable completes.
But what is happening is, that the promise gets resolved, even before the first next callback is executed and before the observable is completed.
In my second attempt, I created a new Promise and resolve it in the completed callback of the queueObservable. This approach is working fine, but I don't understand why the first approach is not working.
Can you explain it to me? And is there a smarter solution to solve this problem?
Kind regards, David
import { of, Subject } from 'rxjs';
import { delay, mergeMap } from 'rxjs/operators';
const testFunction1 = async () => {
const queue = new Subject();
const list = [];
const queueObservable = queue.pipe(mergeMap(val => of(val).pipe(delay(500)), 2));
queueObservable.subscribe((next) => {
console.log(next);
list.push(next);
}, () => { },
(complete) => {
console.log('Observable1 completed');
});
queue.next('HTTP1: 1');
queue.next('HTTP1: 2');
queue.next('HTTP1: 3');
queue.next('HTTP1: 4');
queue.complete();
await queueObservable.toPromise(); // I expect this to resolve when all values passed the mergeMap pipe.
return list;
}
const testFunction2 = async () => {
const list = [];
await new Promise(resolve => {
const queue = new Subject();
const queueObservable = queue.pipe(mergeMap(val => of(val).pipe(delay(500)), 2));
queueObservable.subscribe((next) => {
console.log(next);
list.push(next);
}, () => { },
() => {
console.log('Observable2 completed'); // This gets called when all values passed mergeMap pipe.
resolve();
});
queue.next('HTTP2: 1');
queue.next('HTTP2: 2');
queue.next('HTTP2: 3');
queue.next('HTTP2: 4');
queue.complete();
});
return list;
}
testFunction1().then(res => {
console.log('Over1: ' + JSON.stringify(res));
});
testFunction2().then(res => {
console.log('Over2: ' + JSON.stringify(res));
});
Edit: This diagram shows basically what I want to archieve. The piped observable (queueObservable) completes, when all other observables are completed. When I call toPromise on the piped observable it should only resolve when the piped Observable is completed (At least thats my understanding of the toPromise functionality).