2

I have an array of Promises on my Typescript project.

Most of the time I'm using Promise.all() in order to execute all of them in parallel:

let promises:Array<Promise<string>> = ....
Promise.all(promises)
  .then(results:Array<string> => ....)
  .catch(error => ....)

I have to reduce back-pressure by executing an array of promises by running them with a concurrency of 3.

I would like to stay without dependencies but if you have a solution through a dep, feel free.


I tried to use es6-promise-pool but I seems that it's not ready for Typescript

import PromisePool = require('es6-promise-pool')
const promiseProducer = () => {
    if (commands.length > 0) {
        return commands.shift()
    } else {
        return null
    }
}
let concurrency = 3;
let pool = new PromisePool(promiseProducer, concurrency)

This expression is not constructable. Type 'typeof import("..../node_modules/es6-promise-pool/es6-promise-pool")' has no construct signatures.


I've implemented the mapConcurrent() function according to @jfriend00's comment.

function mapConcurrent(items: Array<any>, maxConcurrent: number, fn: any): Promise<Array<any>> {
    let index = 0;
    let inFlightCntr = 0;
    let doneCntr = 0;
    let results = new Array(items.length);
    let stop = false;
    return new Promise((resolve, reject) => {
        const runNext = () => {
            let i = index;
            ++inFlightCntr;
            fn(items[index], index++).then(function (val: any) {
                ++doneCntr;
                --inFlightCntr;
                results[i] = val;
                run();
            }, function (err: any) {
                stop = true;
                reject(err);
            });
        }
        const run = () => {
            while (!stop && inFlightCntr < maxConcurrent && index < items.length) {
                runNext();
            }
            if (doneCntr === items.length) {
                resolve(results);
            }
        }
        run();
    });
}

Then I was able to run promises as expected

let array = [1,2,3,4,5,6,7,8,9,10]
const maxConcurrency = 3
mapConcurrent(array, maxConcurrency , (index: number) => myPromise(index))
    .then((results: Array<any>) => {
        //... do stuff with results ...
     }).catch(error => { reject(error) })

Only one stuff is missing according to Typescript, does there is a way to get an array of Promise type for the promise results?

If my promise have a type of <{id:string}>, can we get the result with type Array<{id:string}>?

Actually mapConcurrent() only resolve type Array

Benjamin Bohec
  • 118
  • 2
  • 11
  • Multiple options for limiting the number of requests in flight at the same time here: [Controlling concurrency in an array of operations](https://stackoverflow.com/questions/60443116/waiting-for-web-worker-making-http-call-to-finish/60443348#60443348). Note, you can't built the array of promises first because that launches all the operations. Instead, you have to process the array slowly so that only N operations are in-flight at the same time. See the above answer for several different approaches. The one I use regularly is `mapConcurrent()` referenced in that answer. – jfriend00 Mar 19 '20 at 09:22
  • 1
    Try switch to es module syntax `import PromisePool from 'es6-promise-pool'` – hackape Mar 19 '20 at 09:36
  • @hackape thanks for the hint, the PromisePool is right imported, I still have an issue on my producer in order to be supported by the pool. – Benjamin Bohec Mar 19 '20 at 10:06

1 Answers1

0

With typing

function mapConcurrent<T>(items: Array<string>, maxConcurrent: number, fn: (item: string) => Promise<T>): Promise<Array<T>> {
        let index = 0;
        let inFlightCntr = 0;
        let doneCntr = 0;
        let results = new Array(items.length);
        let stop = false;
        return new Promise((resolve, reject) => {
            const runNext = () => {
                let i = index;
                ++inFlightCntr;
                fn(items[index])
                    .then(result => {
                        ++doneCntr;
                        --inFlightCntr;
                        results[i] = result;
                        run();
                    })
                    .catch(error => {
                        stop = true;
                        reject(error);
                    });
                index++
            }
            const run = () => {
                while (!stop && inFlightCntr < maxConcurrent && index < items.length) {
                    runNext();
                }
                if (doneCntr === items.length) {
                    resolve(results);
                }
            }
            run();
        });
    }
Benjamin Bohec
  • 118
  • 2
  • 11