9

In async, if I need to apply a asynchronousfunction to 1000 items, I can do that with:

async.mapLimit(items, 10, (item, callback) => {
    foo(item, callback);
});

so that only 10 item are processed at the same time, limiting overhead and allowing control.

With ES6 promise, while I can easily do:

Promise.all(items.map((item) => {
    return bar(item);
}));

that would process all 1000 items at the same time which may cause a lot of problems.

I know Bluebird have ways to handle that, but I am searching a ES6 solution.

DrakaSAN
  • 7,673
  • 7
  • 52
  • 94
  • Possible duplicate of [What is the best way to limit concurrency when using ES6's Promise.all()?](http://stackoverflow.com/questions/40639432/what-is-the-best-way-to-limit-concurrency-when-using-es6s-promise-all) – Yosef Weiner May 10 '17 at 12:29

4 Answers4

11

If you don't care about the results, then it's quick to whip one up:

Promise.eachLimit = async (funcs, limit) => {
  let rest = funcs.slice(limit);
  await Promise.all(funcs.slice(0, limit).map(async func => {
    await func();
    while (rest.length) {
      await rest.shift()();
    }
  }));
};

// Demo:

var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function foo(s) {
  await wait(Math.random() * 2000);
  console.log(s);
}

(async () => {
  let funcs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("").map(s => () => foo(s));
  await Promise.eachLimit(funcs, 5);
})();

A key performance property is running the next available function as soon as any function finishes.

Preserving results

Preserving the results in order makes it a little less elegant perhaps, but not too bad:

Promise.mapLimit = async (funcs, limit) => {
  let results = [];
  await Promise.all(funcs.slice(0, limit).map(async (func, i) => {
    results[i] = await func();
    while ((i = limit++) < funcs.length) {
      results[i] = await funcs[i]();
    }
  }));
  return results;
};

// Demo:

var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function foo(s) {
  await wait(Math.random() * 2000);
  console.log(s);
  return s.toLowerCase();
}

(async () => {
  let funcs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("").map(s => () => foo(s));
  console.log((await Promise.mapLimit(funcs, 5)).join(""));
})();
jib
  • 40,579
  • 17
  • 100
  • 158
  • 2
    For anyone unfamiliar with the await/async notations, you can copy/paste the definitions (top above the Demo) and then use it normally with .then() like you're used to. – radicand Jun 06 '17 at 21:04
  • 1
    Would you not need to rewrite the mapped function in line `let funcs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("").map(s => () => foo(s));` as: `... .map(s => async () => await(foo));`? – Zach Feb 06 '18 at 17:10
  • I think there's a bug with the version that preserves results. I'm getting the same item in the array processed multiple times. – chovy Jun 02 '18 at 02:24
  • 1
    @chovy Now fixed. Was being sloppy about when `limit` was incremented in a for-loop containing `await`. Thanks for spotting it! – jib Jun 03 '18 at 01:54
  • What is promise.eachlimit? – Jimmy Kane Oct 10 '20 at 17:46
  • @JimmyKane That's explained in the OP. – jib Oct 11 '20 at 13:25
3

There's nothing built in, but you can of course group them yourself into promise chains, and use a Promise.all on the resulting array of chains:

const items = /* ...1000 items... */;
const concurrencyLimit = 10;
const promise = Promise.all(items.reduce((promises, item, index) => {
    // What chain do we add it to?
    const chainNum = index % concurrencyLimit;
    let chain = promises[chainNum];
    if (!chain) {
        // New chain
        chain = promises[chainNum] = Promise.resolve();
    }
    // Add it
    promises[chainNum] = chain.then(_ => foo(item));
    return promises;
}, []));

Here's an example, showing how many concurrent promises there are any given time (and also showing when each "chain" is complete, and only doing 200 instead of 1,000):

const items = buildItems();
const concurrencyLimit = 10;
const promise = Promise.all(items.reduce((promises, item, index) => {
    const chainNum = index % concurrencyLimit;
    let chain = promises[chainNum];
    if (!chain) {
        chain = promises[chainNum] = Promise.resolve();
    }
    promises[chainNum] = chain.then(_ => foo(item));
    return promises;
}, []).map(chain => chain.then(_ => console.log("Chain done"))));
promise.then(_ => console.log("All done"));

function buildItems() {
  const items = [];
  for (let n = 0; n < 200; ++n) {
    items[n] = n;
  }
  return items;
}

var outstanding = 0;
function foo(item) {
  ++outstanding;
  console.log("Starting " + item + " (" + outstanding + ")");
  return new Promise(resolve => {
    setTimeout(_ => {
      --outstanding;
      console.log("Resolving " + item + " (" + outstanding + ")");
      resolve(item);
    }, Math.random() * 500);
  });
}
.as-console-wrapper {
  max-height: 100% !important;
}

I should note that if you want to track the result of each of those, you'd have to modify the above; it doesn't try to track the results (!). :-)

T.J. Crowder
  • 1,031,962
  • 187
  • 1,923
  • 1,875
  • Sad that nothing exist for this, async doesn't support promises and yet is so helpful :/ – DrakaSAN May 10 '17 at 13:11
  • @DrakaSAN: Nothing *built-in*. :-) I'm sure there are libraries that do it, like Bluebird or Q or similar (I haven't checked, though). – T.J. Crowder May 10 '17 at 13:11
0

This is the closest one to async.eachLimit

Promise.eachLimit = async (coll, limit, asyncFunc) => {
let ret = [];
    const splitArr = coll.reduce((acc,item,i)=> (i%limit) ? acc :[...acc,coll.slice(i,i+limit)],[])
    for(let i =0; i< splitArr.length;i++){
        ret[i]=await Promise.all(splitArr[i].map(ele=>asyncFunc(ele)));
    }
    return ret;
}

const wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function foo(s) {
  await wait(Math.random() * 2000);
  console.log(s);
  return s.toLowerCase();
}

(async () => {
  let arr = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("");
  console.log((await Promise.eachLimit(arr, 5, foo)));
})();
saketh
  • 803
  • 1
  • 10
  • 24
-1

Using Array.prototype.splice

while (funcs.length) {
  await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
hiddensunset4
  • 5,825
  • 3
  • 39
  • 61