1

I want to create a queue where I always execute two httpRequests in parallel. When one request completes, the next should start so that always two run in parallel. I want to merge the results and return it as a whole, when all requests have finished.

My first attempt was to create a Subject (queue) where I pass the urls with queue.next(url). Immediately after theat I complete the subject. I use mergeMap to limit the execution to two requests at a time (queueObservable).

I Subscribe to the queueObservable and add the response to an array (list). Then I await the promise of the queueObservable, which should resolve when the observable completes.

But what is happening is, that the promise gets resolved, even before the first next callback is executed and before the observable is completed.

In my second attempt, I created a new Promise and resolve it in the completed callback of the queueObservable. This approach is working fine, but I don't understand why the first approach is not working.

Can you explain it to me? And is there a smarter solution to solve this problem?

Kind regards, David

import { of, Subject } from 'rxjs';
import { delay, mergeMap } from 'rxjs/operators';

const testFunction1 = async () => {
    const queue = new Subject();
    const list = [];
    const queueObservable = queue.pipe(mergeMap(val => of(val).pipe(delay(500)), 2));

    queueObservable.subscribe((next) => {
        console.log(next);
        list.push(next);
    }, () => { },
        (complete) => {
            console.log('Observable1 completed');
        });

    queue.next('HTTP1: 1');
    queue.next('HTTP1: 2');
    queue.next('HTTP1: 3');
    queue.next('HTTP1: 4');
    queue.complete();
    await queueObservable.toPromise(); // I expect this to resolve when all values passed the mergeMap pipe.
    return list;
}

const testFunction2 = async () => {
    const list = [];
    await new Promise(resolve => {
        const queue = new Subject();
        const queueObservable = queue.pipe(mergeMap(val => of(val).pipe(delay(500)), 2));

        queueObservable.subscribe((next) => {
            console.log(next);
            list.push(next);
        }, () => { },
            () => {
                console.log('Observable2 completed'); // This gets called when all values passed mergeMap pipe.
                resolve();
            });

        queue.next('HTTP2: 1');
        queue.next('HTTP2: 2');
        queue.next('HTTP2: 3');
        queue.next('HTTP2: 4');
        queue.complete();
    });
    return list;

}
testFunction1().then(res => {
    console.log('Over1: ' + JSON.stringify(res));
});

testFunction2().then(res => {
    console.log('Over2: ' + JSON.stringify(res));
});

Edit: This diagram shows basically what I want to archieve. The piped observable (queueObservable) completes, when all other observables are completed. When I call toPromise on the piped observable it should only resolve when the piped Observable is completed (At least thats my understanding of the toPromise functionality).

enter image description here

David
  • 183
  • 11
  • Have you looked at [forkJoin](https://www.learnrxjs.io/operators/combination/forkjoin.html)? Seems like what you are looking for to me – Julien Rousé Aug 22 '19 at 15:38
  • forkJoin works fine when you don't want to limit the amount of requests being executed at the same time. I can't find a way to use it when there is a limit (in this example 2). – David Aug 23 '19 at 13:29
  • Your first example isn't working because you complete the queue with `queue.complete();` before a value is emitted as @seanplwong explains. If you're looking for an operator that behaves like `forkJoin` and also has a `concurrent` parameter check out this answer: https://stackoverflow.com/a/54247150/9423231 – frido Aug 23 '19 at 18:47
  • Thanks, this looks promising! I will try it out. – David Sep 06 '19 at 06:21

2 Answers2

0

Could you do something like this (fiddle)

const urlsSink = new Subject();
const responses = urlsSink.pipe(bufferCount(2), mergeMap((urls) => {
    return combineLatest(urls.map(url => makeHttpRequest(url)));
}));

Then you can subscribe to responses and push your URLs into urlsSink. bufferCount will group URLs into groups of two. combineLatest will combine the two HTTP observables into one combined HTTP observable.

Pace
  • 41,875
  • 13
  • 113
  • 156
  • I updated my question. I want only two requests to be executed at the same time. When one has finished, the next should start. So there is always a maximum of two requests being executed at the same time. – David Aug 23 '19 at 13:27
0

This is because you are delaying the values, the values are not yet emitted but you completed the subject. Once the subject complete, the promise should be resolved.

Try this

setTimeout(() => queue.complete(), 2000);

This will force case 1 to work but this is not suitable for making request.

seems queueObservable's promise follow queue's completion.

you could use toArray operator to merge to result.

const queueObservable = from(listOfURL).pipe(
  mergeMap(val => of(val).pipe(delay(500)), 2),
  mergeMap(val => fetch(url + val)),
  toArray(),
);
seanplwong
  • 1,051
  • 5
  • 14