3

I've got an Angular 2 app, which fetches a varying number of ids from a server, and then, for each id makes another REST call in a forkJoin.

However, the amount of ids can go up to a few hundreds, which might be problematic when suddenly making several hundred REST calls in parallel.

Is there a way of limiting the number of parallel calls when using RxJs and the forkJoin operator?

skappler
  • 722
  • 1
  • 10
  • 26
  • 2
    Possible duplicate of [Limit number of requests at a time with RxJS](https://stackoverflow.com/questions/34535685/limit-number-of-requests-at-a-time-with-rxjs) – developer033 Jun 11 '17 at 17:32

2 Answers2

5

Onw way would be to use bufferCount:

Rx.Observable.from([1,2,3,4,5,6,7,8])
  .bufferCount(3)
  .concatMap(items => {
    console.log('ConcatMap', items);
    let tasks = items.map(item => Rx.Observable.timer(Math.random() * 1000).do(() => console.log(item, 'ready')));
    return Rx.Observable.forkJoin(...tasks);
  })
  .subscribe()
<script src="https://npmcdn.com/@reactivex/rxjs@5.3.1/dist/global/Rx.js"></script>
Sergey Sokolov
  • 2,709
  • 20
  • 31
0

I have done a function, hope it helps someone! (with RxJS 6 and lodash)

const { timer, forkJoin, of } = rxjs;
const { tap, map, mapTo, first, switchMapTo, concatMap } = rxjs.operators;

forkJoinLimit = (limit, sources) => {
  if (sources.length < limit) {
    return forkJoin(sources);
  }
  return _.reduce(
    _.chunk(sources, limit),
    (acc, subSource) => {
      acc = acc.pipe(
        concatMap(val =>
          forkJoin(subSource).pipe(
            map(val2 => (val)? _.concat(val, val2): val2),
      )));
      return acc;
    }, of(null));
};

generateObservable = val =>
  of(null).pipe(
    tap(() => console.log("Starting " + val)),
    switchMapTo(
      timer(Math.floor(Math.random() * 1000) + 1).pipe(
        first(), 
        mapTo(val),
        tap(() => console.log("Ending " + val))),
  ));

console.clear();
const serie = [];
const size = 10;
const limit = 3;
for (let i = 0; i < 10; i++) {
  serie.push(generateObservable(i));
}

forkJoinLimit(limit, serie).subscribe(x => console.log("-- End --", x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.0/rxjs.umd.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/lodash.js/4.17.10/lodash.min.js"></script>
RainHeart257
  • 305
  • 1
  • 9