2

The problem:

I have a list of urls. I have an Observable method that uses a url to grab a file, download it, and store it locally. I would like to launch these requests in parallel, but only allow 4 threads at once (I generate pdfs server side and want to reduce the load). In addition, I need to return from this download step only once all the url locations have been downloaded.

Current solution

Right now, I just launch the requests all at once and use forkJoin. After searching for a couple days, I have come across a few solutions on here that have given me some ideas, but they don't do exactly what I want. My main source is here.

export function limitedParallelObservableExecution<T>(listOfItems: Array<T>, observableMethod: (item: T) => Observable<any>): Observable<any> {
  const MAX_CONCURRENCY = 4;
  if (listOfItems && listOfItems.length > 0) {
    let observableListOfItems: Observable<Observable<any>> = Observable.from(listOfItems).map(
      (item: T) => observableMethod(item)
    );
    return observableListOfItems.merge(MAX_CONCURRENCY);
  } else {
    return Observable.of({});
  }
}

I have another download step that is flatMapped to execute once this step completes. However, instead of just executing once, the next step executes once for every url in the list (as I understand it, this is because it emits once for each url that completes).

How do I maintain this concurrency while only returning once when all of my downloads have completed?

In addition, this seems to still launch all of my requests at once. Is there a better way to limit the number of simultaneous requests? Like, launch n requests in parallel, but only launch n + 1 once once of the first n have completed?

Extra code examples

Here is a code snippet showing how I launch each download step only once the previous once has completed:

).flatMap(
  (uploadFlightActualsSuccess) => {
    this.changeProgressValue(this.FLIGHT_ACTUALS_UPLOAD_END); 
    return this.syncDocuments();
  }
).flatMap(
  (syncDocumentsSuccess) => {
    this.changeProgressValue(this.OPERATOR_DOCUMENT_DOWNLOAD_END);
    return this.syncTripDocuments()
  },
  (error) => error
).flatMap(
  (syncTripDocumentsSuccess) => {
    this.changeProgressValue(this.TRIP_DOCUMENT_DOWNLOAD_END);      
    return this.expenseItemSyncProvider.syncPortalData();
  }
).flatMap(
  (expenseItemSyncSuccess) => {
    return this.flightPersonnelSyncProvider.syncFlightPersonnelByTrip();
  }
).flatMap(

'syncTripDocuments' is the request that downloads the list of urls. I only want to the next step once all those complete.

Connor O'Doherty
  • 385
  • 1
  • 5
  • 19
  • Rather than subscribing to when the merged observable emits, subscribe to when it completes. – Explosion Pills Jan 09 '18 at 21:00
  • I added a code example of the subscription step. How exactly do I flatMap to complete instead of upon emission? And is there a better way to sequentially do each of these steps? – Connor O'Doherty Jan 09 '18 at 21:13
  • If you don't care about the actual values emitted and you just want to wait for everything to complete while limiting concurrency, you can do `.merge(MAX_CONCURRENCY).last()` – Explosion Pills Jan 09 '18 at 21:43
  • I tried that, but it doesn't look like my downloads are completing. When I forkJoin, it appears like everything is printing out correctly, but with merge they don't seem to finish. I'll give it another go, thanks! – Connor O'Doherty Jan 09 '18 at 22:09

2 Answers2

1

Here's one way to do it, use the zip operator to throttle the requests, like this.

Start with two streams, the first is the sequence of URLs to download, the 2nd is a sequence of 4 objects, so something like this:

s1$ = Observable.from(list_of_urls);
s2$ = new ReplaySubject();
for(let i = 0; i < 4 ; i++) s2$.next(i);

Then zip these two together, and mergeMap to download the files. Once each download completes, emit a new event at s2$ so that it can continue, something like this:

s3$ = s1$.pipe(
    zip(s2$),
    mergeMap(([a, b]) => download_url(a).pipe(tap(c => s2$.next(c)))))

So now, every time a file finishes downloading a new element is emitted on s2$ allowing the next zipped pair to be processed.

EDIT

Alternatively we can use a simple Subject instead of a ReplaySubject and move the for loop emitting the first four values till after the subscription to s3$:

s1$ = Observable.from(list_of_urls);
s2$ = new Subject();
s3$ = s1$.pipe(
    zip(s2$),
    mergeMap(([a, b]) => download_url(a).pipe(tap(c => s2$.next(c)))))
s3$.subscribe(...);
for(let i = 0; i < 4 ; i++) s2$.next(i);

EDIT 2

Instead of a for loop to create the first 4 elements, we can zip with from([1,2,3,4]).pipe(concat(s2$)) instead of simply with s2$

I haven't run any of this, but you get the general idea.

Aviad P.
  • 32,036
  • 14
  • 103
  • 124
  • Thank you for this suggestion! One question though, What exactly does your third line do, where you are applying replay subject? – Connor O'Doherty Jan 09 '18 at 22:22
  • Well to be honest, I can use a simple `Subject` and only emit the first 4 values after the subscription to `s3$`. So basically replace `ReplaySubject` with `Subject` and move the `for` loop to the end (after adding a `subscribe` call to `s3$`) – Aviad P. Jan 09 '18 at 22:25
  • Did you mean to use .from instead of .of? – Connor O'Doherty Jan 09 '18 at 22:41
  • Also, I added the calling method code block in the post. 'syncTripDocuments' is the method calling this parallel execution, flatMapping it to the next step. Would this mean that I would be calling the for statement from that same block of code, and use flatMap instead of subscribe? – Connor O'Doherty Jan 09 '18 at 22:44
  • Yes `from` instead of `of` and yes you can postpone your subscribe until all the observable chain is ready. – Aviad P. Jan 09 '18 at 22:54
  • So would that mean I need to access s2$ from the top level function then? And is there another step I need to ensure that the next method in the chain is only called once, instead of once for every item in the list? Thanks again for your help! – Connor O'Doherty Jan 09 '18 at 22:58
  • Well, not necessarily - you could use the `ReplaySubject` method or use a similar method. – Aviad P. Jan 09 '18 at 22:58
  • For example, you can zip with `from([1,2,3,4]).pipe(concat(s2$))` instead of simply with `s2$` – Aviad P. Jan 09 '18 at 22:59
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/162849/discussion-between-aviad-p-and-connor-odoherty). – Aviad P. Jan 09 '18 at 23:20
1

The issue with the posted solutions (while giving me the concurrency flexibility) was that they didn't satisfy the condition requiring the entire action to only emit once, once every item had completed.

The working solution is as follows:

import { toArray, mergeMap } from "rxjs/operators";
import { of, from, Observable } from "rxjs";

export function limitedParallelObservableExecution<T>(
    listOfItems: Array<T>,
    observableMethod: (item: T) => Observable<any>,
    maxConcurrency: number = 4
): Observable<any> {
    if (listOfItems && listOfItems.length > 0) {
        let observableListOfItems: Observable<T> = from(listOfItems);
        return observableListOfItems.pipe(
            mergeMap(observableMethod, maxConcurrency),
            toArray()
       );
    } else {
        return of({});
    }
}

The strategy here is to:

1) Create an observable stream from the list of items

2) Pass the observable method into mergeMap, along with maxConcurrency

3) Use toArray() to ensure all the observables complete before returning

Musa Haidari
  • 2,109
  • 5
  • 30
  • 53
Connor O'Doherty
  • 385
  • 1
  • 5
  • 19
  • 1
    It might not matter to you, but it should be noted that if `maxConcurrency` is greater than one, there is no guarantee that the indices of the values in the emitted array will correspond to the indices of the items in the source array. That is, the value that corresponds to the item at index 0 in the source array might not be at index 0 in the emitted array. – cartant Jan 17 '18 at 05:12
  • Ah, thank you! That is a very important distinction. Is this what Aviad is trying to accomplish? And do you have a suggested implementation? Still getting a handle on the rxjs library, and I'm curious to know – Connor O'Doherty Jan 17 '18 at 18:52
  • 1
    To ensure the array indices correspond, you could use [`mergeMap`](http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-mergeMap)'s result selector function. It's passed the outer and inner values and indices, so it'd be easy to project the result to include the outer index with the values (e.g. `{ value: innerValue, index: outerIndex }`), accumulate the array, sort it based on the outer index and the re-project the array elements to remove the indices. – cartant Jan 17 '18 at 19:40
  • The solution is amazing, just the code which works for latest angular is somehow different: ``` limitedParallelObservableExecution( listOfItems: Array, observableMethod: (item: T) => Observable, maxConcurrency: number = 4 ): Observable { if (listOfItems && listOfItems.length > 0) { let observableListOfItems: Observable = from(listOfItems); return observableListOfItems.pipe( mergeMap(observableMethod, maxConcurrency), toArray() ); } else { return of({}); } } ``` – Musa Haidari Jan 23 '20 at 05:39