If you don't want to use any plugins/dependencies you can use this solution.
Let's say your data is in an array called datas
- Create a function that will process your data in the
datas
array, lets call it processData()
- Create a function that will execute
processData()
one after another in a while loop until there are no data left on datas
array, lets call that function bufferedExecution()
.
- Create an array of size
buffer_size
- Fill the array with
bufferedExecution()
- And wait for it to resolve in
Promise.all()
or in Promise.allSettled()
Here is a working example, where data is numbers and operation waits for a while and return the square of the number, It also randomly rejects.
const datas = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
// this datas array should not contain undefined values for this code to work
const buffer_size = 3;
const finishedPromises = [];
// change this function to your actual function that processes data
async function processData(item) {
return new Promise((resolve, reject) => {
// wait for some time
setTimeout(() => {
// randomly resolve or reject
if (Math.random() > 0.5) {
resolve(item ** 2);
} else {
reject("error message");
}
}, 1500);
});
}
// this function executes one function per loop, but magic happens when you
// execute the function below, multiple times
async function bufferedExecution(callback, i) {
return new Promise(async (resolve, reject) => {
// take first vale to process
let next = datas.shift();
// check if there is a value, (undefined means you have reached the end of datas array)
while (next != undefined) {
// just to show which function is running (index of function in array)
console.log(`running function id: ${i}`);
let result;
try {
// process data with your function's callback
result = await callback(next);
// result finishes without error
finishedPromises.push({
input: next,
result: result,
});
} catch (error) {
// rejected, so adds error instead of result
finishedPromises.push({
input: next,
error: error,
});
}
// get next data from array and goes to next iteration
next = datas.shift();
}
// once all that is done finish it
resolve();
});
}
// here is where the magic happens
// we run the bufferedExecution function n times where n is buffer size
// bufferedExecution runs concurrently because of Promise.all()/Promise.allsettled()
const buffer = new Array(buffer_size)
.fill(null)
.map((_, i) => bufferedExecution(processData, i + 1));
Promise.allSettled(buffer)
.then(() => {
console.log("all done");
console.log(finishedPromises);
// you will have your results in finishedPromises array at this point
// you can use input KEY to get the actual processed value
// first check for error, if not get the results
})
.catch((err) => {
console.log(err);
});
Output
// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
all done
[
{ input: 1, error: 'error message' },
{ input: 2, result: 4 },
{ input: 3, result: 9 },
{ input: 4, result: 16 },
{ input: 5, error: 'error message' },
{ input: 6, result: 36 },
{ input: 7, result: 49 },
{ input: 8, error: 'error message' },
{ input: 9, result: 81 },
{ input: 10, result: 100 },
{ input: 11, result: 121 },
{ input: 12, error: 'error message' },
{ input: 13, result: 169 }
]