6

I have a function with a promises which must be executed n times with different params each time. I want to chain the promises in a way that the script is always working on 3-4 promises at the time.

I made it with promise.all, this executes 3 concurrently and when all the promises resolves it goes on with the next 3.

How to make it work that when one of 3 resolves it starts immediatly with another but always working on max 3 at the time?

for( var i = 0; i < tasks.length; i++){

    if( i > 0 && i%3 == 0 ){

      await Promise.all([
       doTaskFunction(tasks[i]),
        doTaskFunction(tasks[i-1]),
        doTaskFunction(tasks[i-2]),
      ]);
    }

  }
Daniel Conde Marin
  • 7,588
  • 4
  • 35
  • 44
Pjotr Raskolnikov
  • 1,558
  • 2
  • 15
  • 27
  • Possible duplicate of [Throttle amount of promises open at a given time](https://stackoverflow.com/questions/38385419/throttle-amount-of-promises-open-at-a-given-time) – Jeff Bowman Feb 09 '18 at 18:18
  • There are a lot of existing questions/answers related to concurrent promises, though the term of choice seems to be "throttling" or "rate-limiting": https://stackoverflow.com/search?q=%5Bpromise%5D+throttle+OR+throttling – Jeff Bowman Feb 09 '18 at 18:19
  • https://stackoverflow.com/a/38778887/1048572, https://stackoverflow.com/a/39197252/1048572 – Bergi Apr 10 '18 at 17:33

4 Answers4

4

You can achieve this fairly easy using es6-promise-pool:

const tasks = [
    (param) => new Promise(function(resolve, reject) {
        setTimeout(resolve, 2000, 'foo');
    }),
    () => new Promise(function(resolve, reject) {
        setTimeout(resolve, 2000, 'foo');
    }),
    () => new Promise(function(resolve, reject) {
        setTimeout(resolve, 2000, 'foo');
    }),
    () => Promise.resolve(1),
    () => Promise.resolve(2),
    () => Promise.resolve(3)
 ];
 
 let count = 1;

 const promiseProducer = () => {
    while(tasks.length) {
       console.log('processing ' + count++);
       const task = tasks.shift();
       return task(); // optionally you could pass a parameter here
    }
    
    return null;
 }
 
 const pool = new PromisePool(promiseProducer, 3); // concurrent Promises set to 3
 const poolPromise = pool.start();

 poolPromise.then(() => { console.log('done!'); })
<script src="https://cdn.jsdelivr.net/npm/es6-promise-pool@2.5.0/es6-promise-pool.min.js"></script>
Daniel Conde Marin
  • 7,588
  • 4
  • 35
  • 44
2

I am just leaving my naive with generators implementation here! :)

function* myIteratorFactory(arr) {
  for (let i = 0; i < arr.length; i++) {
    yield(arr[i])
  }
}


function delayPromise(text, ms) {
  return function() {
    return new Promise((resolve, reject) => {
      console.log('[%s] Promise with value %s just started', new Date().toISOString(), text)
      setTimeout(() => resolve(text), ms)
    }).then(() => console.log('[%s] Promise with value %s just ended', new Date().toISOString(), text))
  }
}

var promArr = [
  delayPromise('hi', 1500),
  delayPromise('alex', 2000),
  delayPromise('how', 1700),
  delayPromise('are', 1800),
  delayPromise('you', 1500),
]

var que = 0
var myIterator = myIteratorFactory(promArr)


function executor(r) {

  while (que < 3) {
    var next = myIterator.next()
    if (next.done) return;

    next.value()
      .then(() => {
        que--
        executor(r)
        if (que == 0) r()
      })
    que++
  }



}
executor(() => console.log('i am done for today!'))
Alex Michailidis
  • 4,078
  • 1
  • 16
  • 35
0

No external libraries. Just plain JS.

It can be resolved using recursion.

The idea is that initially we immediately run the maximum allowed number of tasks and each of these tasks should recursively initiate a new task on its completion.

In this example I populate successful responses together with errors and I execute all requests but it's possible to slightly modify algorithm if you want to terminate batch execution on the first failure.

async function batchExecute(tasks, limit) {
  limit = Math.min(tasks.length, limit);

  return new Promise((resolve, reject) => {
    const responsesOrErrors = new Array(tasks.length);
    let startedCount = 0;
    let finishedCount = 0;
    let hasErrors = false;

    function recursiveExecute() {
      let index = startedCount++;

      doTaskFunction(tasks[index])
        .then(res => {
          responsesOrErrors[index] = res;
        })
        .catch(error => {
          responsesOrErrors[index] = error;
          hasErrors = true;
        })
        .finally(() => {
          finishedCount++;
          if (finishedCount === tasks.length) {
            hasErrors ? reject(responsesOrErrors) : resolve(responsesOrErrors);
          } else if (startedCount < tasks.length) {
            recursiveExecute();
          }
        });
    }

    for (let i = 0; i < limit; i++) {
      recursiveExecute();
    }
  });
}

async function doTaskFunction(task) {
  console.log(`${task} started`);
  const delay = Math.floor(Math.random() * 1500);
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (delay <= 1000) {
        console.log(`${task} finished successfully`);
        resolve(`${task} success`);
      } else {
        console.log(`${task} finished with error`);
        reject(`${task} error`);
      }
    }, delay);
  });
}

const tasks = new Array(10).fill('task').map((task, index) => `${task}_${index + 1}`);

batchExecute(tasks, 3)
  .then(responses => console.log('All successfull', responses))
  .catch(responsesWithErrors => console.log('All with several failed', responsesWithErrors));
Anton Fil
  • 223
  • 2
  • 8
0

If you don't want to use any plugins/dependencies you can use this solution.

Let's say your data is in an array called datas

  1. Create a function that will process your data in the datas array, lets call it processData()
  2. Create a function that will execute processData() one after another in a while loop until there are no data left on datas array, lets call that function bufferedExecution().
  3. Create an array of size buffer_size
  4. Fill the array with bufferedExecution()
  5. And wait for it to resolve in Promise.all() or in Promise.allSettled()

Here is a working example, where data is numbers and operation waits for a while and return the square of the number, It also randomly rejects.

const datas = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
// this datas array should not contain undefined values for this code to work

const buffer_size = 3;
const finishedPromises = [];

// change this function to your actual function that processes data
async function processData(item) {
  return new Promise((resolve, reject) => {
    // wait for some time
    setTimeout(() => {
      // randomly resolve or reject
      if (Math.random() > 0.5) {
        resolve(item ** 2);
      } else {
        reject("error message");
      }
    }, 1500);
  });
}

// this function executes one function per loop, but magic happens when you
// execute the function below, multiple times
async function bufferedExecution(callback, i) {
  return new Promise(async (resolve, reject) => {
    // take first vale to process
    let next = datas.shift();
    // check if there is a value, (undefined means you have reached the end of datas array)
    while (next != undefined) {
      // just to show which function is running (index of function in array)
      console.log(`running function id: ${i}`);
      let result;
      try {
        // process data with your function's callback
        result = await callback(next);
        // result finishes without error
        finishedPromises.push({
          input: next,
          result: result,
        });
      } catch (error) {
        // rejected, so adds error instead of result
        finishedPromises.push({
          input: next,
          error: error,
        });
      }
      // get next data from array and goes to next iteration
      next = datas.shift();
    }
    // once all that is done finish it
    resolve();
  });
}

// here is where the magic happens
// we run the bufferedExecution function n times where n is buffer size
// bufferedExecution runs concurrently because of Promise.all()/Promise.allsettled()
const buffer = new Array(buffer_size)
  .fill(null)
  .map((_, i) => bufferedExecution(processData, i + 1));

Promise.allSettled(buffer)
  .then(() => {
    console.log("all done");
    console.log(finishedPromises);
    // you will have your results in finishedPromises array at this point
    // you can use input KEY to get the actual processed value
    // first check for error, if not get the results
  })
  .catch((err) => {
    console.log(err);
  });

Output

// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
all done
[
  { input: 1, error: 'error message' },
  { input: 2, result: 4 },
  { input: 3, result: 9 },
  { input: 4, result: 16 },
  { input: 5, error: 'error message' },
  { input: 6, result: 36 },
  { input: 7, result: 49 },
  { input: 8, error: 'error message' },
  { input: 9, result: 81 },
  { input: 10, result: 100 },
  { input: 11, result: 121 },
  { input: 12, error: 'error message' },
  { input: 13, result: 169 }
]
27px
  • 430
  • 5
  • 16