4

My app needs to make multiple separate calls to an API for data, but I don't want to dump them all at once for performance reasons. Is there an rxjs operator that would allow me to run just a few at a time then merge all their results into one?

So if I need to get results from ABC...XYZ and I want only 2 requests activate a time it would subscribe to A and B. Then if B completes it would subscribe to C. Then A completes and it subscribes to D. And so on until all are complete but only ever having 2 subscriptions active.

JC Ford
  • 6,946
  • 3
  • 25
  • 34
  • What and when should this merged observable emit? – frido Mar 16 '20 at 21:41
  • If you want a `forkJoin` like behaviour with the option to specify the amount of concurrent executions see: https://stackoverflow.com/a/54247150/9423231 – frido Mar 23 '20 at 14:16

2 Answers2

6

Yes, merge() has an option to do just that. The last parameter to merge has to be the number (of concurrent requests).

Excerpt from the docs:

const timer1 = interval(1000).pipe(take(10));
const timer2 = interval(2000).pipe(take(6));
const timer3 = interval(500).pipe(take(10));
const concurrent = 2; // the argument
const merged = merge(timer1, timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));
Mladen
  • 2,070
  • 1
  • 21
  • 37
  • Isn't that limited to 6 input observables? – JC Ford Mar 16 '20 at 18:05
  • 4
    Actually, this is correct solution. [`merge` observable creation method](https://rxjs.dev/api/index/function/merge) accepts concurrency parameter to limit how many active inner observables it subscribes to. The limit of 6 source observables is because there are only 6 TypeScript function overloads defined. `merge` itself has no limitation. So if you want to use it with more parameters I think you can use it like `merge.apply([...])`. Or another solution would be using `mergeAll(concurrency)` operator. – martin Mar 16 '20 at 20:12
  • Well, `merge` doesn't subscribe at all times _if_ the last provided parameter is number. [Here's the example](https://stackblitz.com/edit/2-concurrent-requests?file=index.ts) of how to inspect the example I gave. Also, [`concat`](https://github.com/ReactiveX/rxjs/blob/28e317e4b1c050a76abb57e798ab419fcbe8ed47/src/internal/observable/concat.ts#L130) is just [`merge`](https://github.com/ReactiveX/rxjs/blob/28e317e4b1c050a76abb57e798ab419fcbe8ed47/src/internal/operators/concatAll.ts#L66) with concurrency of 1 ;) – Mladen Mar 16 '20 at 21:34
  • 1
    About `merge`'s last parameter, please take a look at [this line](https://github.com/ReactiveX/rxjs/blob/28e317e4b1c050a76abb57e798ab419fcbe8ed47/src/internal/observable/merge.ts#L131). – Mladen Mar 16 '20 at 21:37
  • I also think this is the correct solution. `merge` is internally using `mergeMap`, which buffers the incoming values if the number of **active subscriptions** exceeds the `concurrent` limit. When an inner obs. **completes**, it will take the oldest value from the buffer and will create an inner observable out of it. – Andrei Gătej Mar 16 '20 at 21:51
  • [Exactly](https://github.com/ReactiveX/rxjs/blob/28e317e4b1c050a76abb57e798ab419fcbe8ed47/src/internal/operators/mergeMap.ts#L175) @AndreiGătej – Mladen Mar 16 '20 at 21:54
  • Your answer is awesome, strange that they don't mention this cool feature [in the documentation here](https://www.learnrxjs.io/learn-rxjs/operators/combination/merge). – Wilt Mar 17 '20 at 15:57
  • Thanks :) Well, learnrxjs.io isn't an official documentation. The link I provided is from the official docs. Also, funny thing is that I learned about this feature by reading the code, not by reading the docs. – Mladen Mar 17 '20 at 21:38
0

The closest I came to is a combination of concat for the two streams and merge to make them run parallel. Probably there are nicer way of doing this, but maybe it could be a starting point for you to fool around with:

import { delay, concatMap } from  'rxjs/operators';
import { of, concat, merge } from 'rxjs';

const reqA = of('a').pipe(delay(1000));
const reqB = of('b').pipe(delay(1000));
const reqC = of('c').pipe(delay(1000));
const reqX = of('x').pipe(delay(1000));
const reqY = of('y').pipe(delay(1000));
const reqZ = of('z').pipe(delay(1000));

const abc = [reqA, reqB, reqC];
const xyz = [reqX, reqY, reqZ];

merge(concat(...abc), concat(...xyz)).subscribe(
  console.log
);

Here the StackBlitz i made with the code above.

So using concat on an observable array (your stream) makes sure there is only one observable running and on complete it will subscribe to the next observable in the array. With merge you can merge two streams abc and xyz meaning there will be two streams active at the same time.


UPDATE

I didn't know about the concurrent parameter for the merge operator, so @lagoman his answer is the way to go. It was not clear for me from the documentation that it was possible. Not sure why they don't mention it explicitly, seems like a very useful feature.

Wilt
  • 41,477
  • 12
  • 152
  • 203