0

Have been trying to solve the issue for a while. Currently I have an array of objects (i call them tiles), which is pretty big. I have an API endpoint where I should send this objects one by one, this API returns nothing, just status. I need to send this objects to endpoint in parallel and concurrent manner and when the last of them is successful I should emit some string value which goes to redux store.

const tilesEpic =(action$, _state$) => {
    action$.pipe(
        ofType('TILE_ACTION'),
        map(tilesArray => postTilesConcurrently(tilesArray),
        map(someId => someReduxAction(someId),
    )

const postTilesConcurrently = (tilesArray) => {
     const tilesToObservables = tilesArray.map(tile => defer(() => postTile(tile))
     return from(tileToObservables).pipe(mergeAll(concurrencyLimit))
     }

The problem is that I have no idea how to emit someId from postTilesConcurrently, now it triggers action after each request is complete.

Dominik Domanski
  • 1,023
  • 2
  • 10
  • 18

2 Answers2

2

mergeAll() will subscribe to all sources in parallel but it will also emit each result immediatelly. So instead you could use for example forkJoin() ( you could use toArray() operator as well).

forkJoin(tilesToObservables)
  .pipe(
    map(results => results???), // Get `someId` somehow from results
  );

forkJoin() will emit just once after all source Observables emit at least once and complete. This means for each source Observable you'll get only the last value it emitted.

martin
  • 93,354
  • 25
  • 191
  • 226
  • Thank you for reply! Let's say I have one hundred objects and I don't want to make one hundred requests at once. How do I limit it up to, say, 5 requests at a time? – Dominik Domanski Jun 23 '21 at 10:24
  • 1
    Then you can use basically what you had. Just after `mergeAll()` use `toArray()` operator that will collect all intermediate `next` notifications and reemit them as one large array after all source Observables complete. – martin Jun 23 '21 at 11:08
  • 1
    @DominikDomanski: In addition to @martin's answer, for a buffered request chain (eg. 5 reqs. max in parallel) you could refer my post here: https://stackoverflow.com/a/62872189/6513921. It uses RxJS `from` function with `bufferCount` operator. – ruth Jun 23 '21 at 11:29
  • Thanks a lot! It's a pitty I did not find it earlier. However I came up with some different code since by the requirements of the task I pass down not the results of the request but some other variable which I got earlier but skipped this part in my example – Dominik Domanski Jun 23 '21 at 12:31
1

After Martin's reply I have adjusted my code in order to use forkJoin

const tilesEpic =(action$, _state$) => {
    action$.pipe(
        ofType('TILE_ACTION'),
        concatMap(tilesArray => postTilesConcurrently(tilesArray),
        map(({someId}) => someReduxAction(someId),
    )

const postTilesConcurrently = (tilesArray) => {
     const tilesToObservables = tilesArray.map(tile => defer(() => postTile(tile))
     return forkJoin({
      images: from(tileToObservables).pipe(mergeAll(concurrencyLimit)),
      someId: from([someId]),
     }
Dominik Domanski
  • 1,023
  • 2
  • 10
  • 18