0

I'm having trouble with RxJS and the correct way to handle an array of request. Let's say I have an array of about 50 requests as follows:

let requestCounter = 0;
function makeRequest(timeToDelay) {
  return of('Request Complete!').pipe(delay(timeToDelay));
}

const requestArray = []
for(let i=0;i<25;i++){
  requestArray.push(makeRequest(3000)); //3 seconds request
  requestArray.push(makeRequest(1000)); //1 second request
}

My goal is to:

  • Launch the requests in parallel
  • Only 5 can run a the same moment
  • When a request is done, the next in the array starts
  • When a request is done(success or error), I need to increment my variable 'requestCounter' by one (requestCounter++)
  • When my last request in the queue is done, I need to subscribe to this event and handle an array resulting of every requests result

So far the closest I've been to do this is by following the response in this post:

RxJS parallel queue with concurrent workers?

The thing is that I'm discovering RxJS and the exemple is way too complicated for me and I can't find how to handle the counter for each request.

Hope you can help me. (Sorry for the broken english, it is not my native language)

Edit: Final solution looks like this:

forkJoinConcurrent<T>(
    observables: Observable<T>[],
    concurrent: number
  ): Observable<T[]> {
    return from(observables).pipe(
      mergeMap((outerValue, outerIndex) => outerValue.pipe(
        tap(// my code ),
        last(),
        catchError(error => of(error)),
        map((innerValue, innerIndex) => ({index: outerIndex, value: innerValue})),
      ), concurrent),
      toArray(),
      map(a => (a.sort((l, r) => l.index - r.index).map(e => e.value))),
    );
  }
  • The answer from the link you posted does what you're asking for. You can use `tap` in an operator queue for side effects like increasing a counter. In the answer from the link you posted a single request is done after the `last` operator. The `toArray` operator combines all request to one array so you have to add `tap(_ => requestCounter++)` somewhere in the queue after `last()` but before `toArray()`. – frido Jul 01 '19 at 09:44
  • Great, it does the job! My only problem is that if one of the calls fail, all the others are cancelled. Is there a way to make it keep going and process the error in the array returned at the end? – SardineBoy Jul 01 '19 at 13:32
  • You have to catch errors on every single request with the `catchError` operator and return an alternative value stream instead, e.g. `catchError(error => of(error))`. Either add `catchError` at the end of the pipe where you create a request, i.e. in your `makeRequest` function (preferred), or add it within the inner pipe after `last`. – frido Jul 01 '19 at 13:47

1 Answers1

0

First of all you should use subject to store the request queue, and take a look at the mergeMap operator, there's a concurrency parameter you can set for max concurrency as well as an index variable to track the number of calls

https://www.learnrxjs.io/operators/transformation/mergemap.html

const requestArray=new Subject()
for(let i=0;i<25;i++){
  requestArray.next(makeRequest(3000)); //3 seconds request
  requestArray.next(makeRequest(1000)); //1 second request
}

requestArray.pipe(
mergeMap((res,index)=>of([res,index]),
res=>res,5),
map((res,index)=>{if(index===25) .... do your thing ; return res;})
).subscribe(console.log)
Fan Cheung
  • 10,745
  • 3
  • 17
  • 39