where it goes wrong
This is happening because at least one promise is rejected before getting to Promise.all
If you adjust the timeout of the rejected promises to 5000
, you will see the catch
block act as intended -
(async () => {
console.log("please wait...")
try {
const asyncActions = []
for (let i = 0; i < 3; i++) {
await new Promise((resolve, reject) => setTimeout(resolve, 1000))
for (let j = 0; j < 3; j++) {
asyncActions.push(new Promise((resolve, reject) => setTimeout(reject, 5000, "FOO")))
}
}
await Promise.all(asyncActions)
console.log('all resolved')
}
catch (e) {
console.log('caught error', e)
}
})()
please wait...
error caught FOO
capturing the error
Above, this works because all asyncActions
are still pending by the time they get to Promise.all
. However this is fragile as we cannot guarantee this would be the case. Benjamin's proposed answer is to swallow the error, but I think we could improve by bubbling the error up -
const sleep = ms =>
new Promise((resolve, _) => setTimeout(resolve, ms))
const myTask = x =>
sleep(30).then(_ => Promise.reject(x))
;(async () => {
console.log("please wait...")
try {
// outer promise
await new Promise(async (resolve, reject) => {
const actions = []
for (let i = 0; i < 3; i++) {
await sleep(1000)
for (let j = 0; j < 3; j++) {
// reject outer promise if this task fails
actions(myTask(j).catch(reject))
}
}
const result = await Promise.all(actions)
console.log('all done')
resolve(result) // <- resolve outer
})
}
catch (e) {
console.log('caught error', e)
}
})()
separate effects
Above, adding an outer promise works, but await new Promise(...)
is a bit of an anti-pattern. By moving the effect (console.log
) and the error handler outside of your task, your code cleans up considerably -
const sleep = ms =>
new Promise((resolve, _) => setTimeout(resolve, ms))
async function myTask(i, j) {
await sleep(300)
if (Math.random() > .95)
throw [i, j]
return i + j
}
function queuedTasks() {
return new Promise(async (resolve, reject) => {
const actions = []
for (let i = 0; i < 3; i++) {
await sleep(1000)
for (let j = 0; j < 3; j++)
actions.push(myTask(i,j).catch(reject))
}
Promise.all(actions).then(resolve)
})
}
console.log("please wait...")
queuedTasks()
.then(results => console.log("all done", results)) // <- receives result
.catch(console.error) // <- handles error
Above, this example has a 5% reject chance for myTask
. Run it several times to see both a resolved and a rejected result -
// resolved
all done [0,1,2,1,2,3,2,3,4]
// rejected
Error: [1,2]
Pool
You could stop there but I think there's still room for improvement here. By reading the code, I can see the intention is to batch the promises three (3) at a time, waiting one (1) second in between each batch -
// ...
return new Promise((resolve, reject) => {
// ...
for (let i = 0; i < 3; i++) { // three batches
await sleep(1000) // wait one second
for (let j = 0; j < 3; j++) // then run batch
actions.push(myTask(...)) // add each task
}
// ...
Promise.all(actions).then(resolve)
})
This design will start three tasks every second, regardless of whether the running tasks are finished. It is a primitive level of control at best and at worst it's a lot of boilerplate to copy every time you want to do queued/throttled tasks.
One solution is to design a data structure that can do these things for you in a reusable way. Let's take a look at Pool
-
// create pool with 3 "threads"
const pool = new Pool(3)
async function queuedTask(i, j) {
const close = await pool.open() // wait for pool thread to open
return throttle(myTask(i, j), 1000) // throttle task by one second minimum
.finally(close) // close thread upon completion
}
// ...
Now you can write a simple loop, this time using queuedTask
-
// ...
const actions = []
for (let i = 0; i < 3; i++) {
for (let j = 0; j < 3; j++) {
actions.push(queuedTask(i,j))
}
}
console.log("please wait...")
Promise.all(actions)
.then(JSON.stringify)
.then(console.log, console.error)
In the table below, we can see how Pool(3)
will carry out the tasks -
task |
(i,j) |
thread 1 |
thread 2 |
thread 3 |
queueing |
(0,0) |
⌛ |
⌛ |
⌛ |
running |
(0,0) |
open |
⌛ |
⌛ |
queueing |
(0,1) |
↓ |
⌛ |
⌛ |
running |
(0,1) |
↓ |
open |
⌛ |
queueing |
(0,2) |
↓ |
↓ |
⌛ |
running |
(0,2) |
↓ |
↓ |
open |
queueing |
(1,0) |
↓ |
↓ |
↓ |
queueing |
(1,1) |
↓ |
↓ |
↓ |
queueing |
(1,2) |
↓ |
↓ |
↓ |
queueing |
(2,0) |
↓ |
↓ |
↓ |
queueing |
(2,1) |
↓ |
↓ |
↓ |
queueing |
(2,2) |
↓ |
↓ |
↓ |
finished |
(0,0) |
closed |
↓ |
↓ |
running |
(2,2) |
open |
↓ |
↓ |
finished |
(0,2) |
↓ |
↓ |
closed |
running |
(2,1) |
↓ |
↓ |
open |
finished |
(0,1) |
↓ |
closed |
↓ |
running |
(2,0) |
↓ |
open |
↓ |
finished |
(2,1) |
↓ |
↓ |
closed |
running |
(1,2) |
↓ |
↓ |
open |
finished |
(2,0) |
↓ |
closed |
↓ |
running |
(1,1) ) |
↓ |
open |
↓ |
finished |
(2,2) |
closed |
↓ |
↓ |
running |
(1,0) |
open |
↓ |
↓ |
finished |
(1,2) |
↓ |
↓ |
closed |
finished |
(1,0) |
closed |
↓ |
⌛ |
finished |
(1,1) |
⌛ |
closed |
⌛ |
To learn more about this approach, see Pool
implemented in this Q&A. Expand the snippet below and run it several times to see both resolved and rejected outputs -
class Pool {
constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))
const throttle = (p, ms) => Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)
async function myTask(i, j) {
await sleep(rand(2000))
if (Math.random() > .95)
throw Error([i, j])
return i + j
}
async function queuedTask(i, j) {
const close = await pool.open()
return throttle(myTask(i, j), 1000).finally(close)
}
const pool = new Pool(3)
const actions = []
for (let i = 0; i < 3; i++) {
for (let j = 0; j < 3; j++) {
actions.push(queuedTask(i,j))
}
}
console.log("please wait...")
Promise.all(actions)
.then(console.log, console.error)
// resolved
[0,1,2,1,2,3,2,3,4]
// rejected
Error: [1,2]