2

I have an array of objects. For each object I need to trigger an asynchronous request (http call). But I only want to have a certain maximum of requests running at the same time. Also, it would be nice (but not neccessary) if I could have one single synchronization point after all requests finished to execute some code.

I've tried suggestions from:

Limit number of requests at a time with RxJS

How to limit the concurrency of flatMap?

Fire async request in parallel but get result in order using rxjs

and many more... I even tried making my own operators.

Either the answers on those pages are too old to work with my code or I can't figure out how to put everything together so all types fit nicely.

This is what I have so far:

for (const obj of objects) {
  this.myService.updateObject(obj).subscribe(value => {
    this.anotherService.set(obj);
  });
}

EDIT 1: Ok, I think we're getting there! With the answers of Julius and pschild (both seem to work equally) I managed to limit the number of requests. But now it will only fire the first batch of 4 and never fire the rest. So now I have:

const concurrentRequests = 4;
from(objects)
  .pipe(
    mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
    tap(result => this.anotherService.set(result))
  ).subscribe();

Am I doing something wrong with the subscribe()?

Btw: The mergeMap with resultSelector parameter is deprecated, so I used mergeMap without it. Also, the obj of the mergeMap is not visible in the tap, so I had to use tap's parameter

EDIT 2:

Make sure your observers complete! (It cost me a whole day)

Genome Prime
  • 45
  • 1
  • 9
  • Yes, you're right with the deprecation :-) I updated my answer accordingly. Could you have a look at my example at Stackblitz? I cannot reproduce the errors you got... maybe you could also create an example showing the errors? – pschild May 18 '19 at 21:35
  • I cannot reproduce the problem in Stackblitz. I don't think it has anything to do with angular/electron/nodejs... at least I hope. Anyway here is a Stackblitz which is more similar to my code: [https://stackblitz.com/edit/rxjs-dawwsl?file=index.ts](https://stackblitz.com/edit/rxjs-dawwsl?file=index.ts) I swear I checked the code character for character, but still.. The only thing I can say is that my code doesn't hit the finalize method. But the first 4 requests all go through the pipe nicely. – Genome Prime May 18 '19 at 22:29
  • Oh and also, there are no errors on the console. How can I find out where and why it's stuck? I've tried with the `catchError` operator - no luck. – Genome Prime May 18 '19 at 22:36
  • I think I'm losing my mind.. I copied the stackblitz code into my app and it works... so I can rule out my stack electron/angular etc. I've created another stackblitz which comes even closer to my code: [https://stackblitz.com/edit/angular-7-master-yf1cik](https://stackblitz.com/edit/angular-7-master-yf1cik) And I've tried the error function inside the `subscribe` method... nothing – Genome Prime May 19 '19 at 10:19
  • So you're saying the last Stackblitz you provided works (it does for me), but in your real code it doesn't? I think there has to be another explanation for this behaviour and so I guess that there simply is no runtime error to catch with `catchError` or the `error` function within the `subscribe`. Do you use the same versions of `rxjs`? What happens if you play around with `concurrentRequests`, set it to 1 or 100? It's hard to tell but there needs to be some difference between your two code bases. – pschild May 19 '19 at 10:51
  • I've figured it out! The issue was that my observer which I created in the service method never completed. Your of(..) observer completed, of course, that's why it was working. So I added `observer.complete()` and voila! Thanks for your help! – Genome Prime May 19 '19 at 11:16
  • 1
    Nice! Glad I could help! – pschild May 19 '19 at 11:23

3 Answers3

4

You can use the third parameter of mergeMap to limit the number of concurrent inner subscriptions. Use finalize to execute something after all requests finished:

const concurrentRequests = 5;
from(objects)
    .pipe(
        mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
        tap(res => this.anotherService.set(res))),
        finalize(() => console.log('Sequence complete'))
    );

See the example on Stackblitz.

pschild
  • 2,918
  • 1
  • 19
  • 26
2
from(objects).pipe(
  bufferCount(10),
  concatMap(objs => forkJoin(objs.map(obj => 
    this.myService.updateObject(obj).pipe(
      tap(value => this.anotherService.set(obj))
  )))),
  finalize(() => console.log('all requests are done'))
)

Code is not tested, but you get the idea. Let me know if any error or explanation is needed

Julius Dzidzevičius
  • 10,775
  • 11
  • 36
  • 81
0

I had the same issue once. When I tried to load multiple images from server. I had to send http requests one after another. I achieved desired outcome using awaited promise. Here is the sample code:

async ngOnInit() {
    for (const number of this.numbers) {
      await new Promise(resolve => {
        this.http.get(`https://jsonplaceholder.typicode.com/todos/${number}`).subscribe(
          data => {
            this.responses.push(data);
            console.log(data);
            resolve();
          }
        );
      });
    }
  }

Main idea is here to resolve the promise once you get the response. With this technique you can come up with custom logic to execute one method once all the requests finished.

Here is the stackblitz. Open up the console to see it in action. :)

Rukshan Dangalla
  • 2,500
  • 2
  • 24
  • 29