The problem:
I have a list of urls. I have an Observable method that uses a url to grab a file, download it, and store it locally. I would like to launch these requests in parallel, but only allow 4 threads at once (I generate pdfs server side and want to reduce the load). In addition, I need to return from this download step only once all the url locations have been downloaded.
Current solution
Right now, I just launch the requests all at once and use forkJoin. After searching for a couple days, I have come across a few solutions on here that have given me some ideas, but they don't do exactly what I want. My main source is here.
export function limitedParallelObservableExecution<T>(listOfItems: Array<T>, observableMethod: (item: T) => Observable<any>): Observable<any> {
const MAX_CONCURRENCY = 4;
if (listOfItems && listOfItems.length > 0) {
let observableListOfItems: Observable<Observable<any>> = Observable.from(listOfItems).map(
(item: T) => observableMethod(item)
);
return observableListOfItems.merge(MAX_CONCURRENCY);
} else {
return Observable.of({});
}
}
I have another download step that is flatMapped to execute once this step completes. However, instead of just executing once, the next step executes once for every url in the list (as I understand it, this is because it emits once for each url that completes).
How do I maintain this concurrency while only returning once when all of my downloads have completed?
In addition, this seems to still launch all of my requests at once. Is there a better way to limit the number of simultaneous requests? Like, launch n requests in parallel, but only launch n + 1 once once of the first n have completed?
Extra code examples
Here is a code snippet showing how I launch each download step only once the previous once has completed:
).flatMap(
(uploadFlightActualsSuccess) => {
this.changeProgressValue(this.FLIGHT_ACTUALS_UPLOAD_END);
return this.syncDocuments();
}
).flatMap(
(syncDocumentsSuccess) => {
this.changeProgressValue(this.OPERATOR_DOCUMENT_DOWNLOAD_END);
return this.syncTripDocuments()
},
(error) => error
).flatMap(
(syncTripDocumentsSuccess) => {
this.changeProgressValue(this.TRIP_DOCUMENT_DOWNLOAD_END);
return this.expenseItemSyncProvider.syncPortalData();
}
).flatMap(
(expenseItemSyncSuccess) => {
return this.flightPersonnelSyncProvider.syncFlightPersonnelByTrip();
}
).flatMap(
'syncTripDocuments' is the request that downloads the list of urls. I only want to the next step once all those complete.