1

Say I want to fetch 10 urls concurrently, and process the responses as they are recieved (which may be in a different order from the order in which they appear in the original list). Ignoring the possibility of rejections, one way to do this is simply to attach a "then" callback to each promise, and then wait for them all to finish using Promise.all().

const fetch_promises = [
  fetch("https://cors-demo.glitch.me/allow-cors"),
  fetch("/"),
  fetch("."),
  fetch(""),
  fetch("https://enable-cors.org"),
  fetch("https://html5rocks-cors.s3-website-us-east-1.amazonaws.com/index.html"),
  fetch("https://api.github.com"),
  fetch("https://api.flickr.com/services/rest/"),
];
const processing_promises = [];
for (const fetch_promise of fetch_promises) {
  processing_promises.push(fetch_promise.then(response => {
    // Process response.  In this example, that means just
    // print it.
    console.log("got a response: ",response);
  }));
}
await Promise.all(processing_promises);

Switching to an example with clearer and more deterministic output:

const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
    sleep(3000).then(()=>"slept 3000"),
    sleep(1000).then(()=>"slept 1000"),
    sleep(5000).then(()=>"slept 5000"),
    sleep(4000).then(()=>"slept 4000"),
    sleep(2000).then(()=>"slept 2000"),
];
const processing_promises = [];
for (const sleep_promise of sleep_promises) {
  processing_promises.push(sleep_promise.then(result => {
     console.log("promise resolved: ",result);
  }));
}
await Promise.all(processing_promises);

The output is as expected:

15:54:16.331 promise resolved:  slept 1000
15:54:17.331 promise resolved:  slept 2000
15:54:18.331 promise resolved:  slept 3000
15:54:19.332 promise resolved:  slept 4000
15:54:20.331 promise resolved:  slept 5000

My question is this: suppose I want to, or need to, express the processing described above as an "async for..of" loop, instead of "then" callbacks; so the promises results need to come out in the form of an async iterable. How would I convert the array of promises to such an async iterable? What I'm asking for is an async generator function AwaitAsTheyCome(), taking as input a list of promises, which yields the results one by one as the promises resolve. I'd then call the function, and do the processing, as follows:

for await (const result of AwaitAsTheyCome(sleep_promises)) {
 console.log("promise resolved: ",result);
}

It should give the same output (with the same timing) as above.

The following attempted solution obviously doesn't work, but it may give an idea of about how simple and short I expect this to be:

async function* AwaitAsTheyCome(promises) {
  for (const promise of promises) {
    promise.then(response => {
      yield response;  // WRONG
      // I want to yield it from AwaitAsTheyCome,
      // not from the current arrow function!
    });
  }
}

The following solution does work, but it's more code than I expected to have to write for this.

async function* AwaitAsTheyCome(promises) {
  // Make a list of notifier promises and
  // functions that resolve those promises,
  // one for each of the original promises.
  const notifier_promises = [];
  const notifier_resolves = [];
  for (const promise of promises) {
    notifier_promises.push(
        new Promise(resolve=>notifier_resolves.push(resolve)));
  }

  const responses = [];
  for (const promise of promises) {
    promise.then(response => {
      responses.push(response);
      // send one notification (i.e. resolve the next notifier promise)
      notifier_resolves.shift()();
    });
  }

  for (const promise of promises) {
    // wait for one notification
    // (i.e. wait for the next notifier promise to be resolved).
    await notifier_promises.shift();
    // yield the corresponding response
    yield responses.shift();
  }
}

// Example/test usage
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
  sleep(3000).then(()=>"slept 3000"),
  sleep(1000).then(()=>"slept 1000"),
  sleep(5000).then(()=>"slept 5000"),
  sleep(4000).then(()=>"slept 4000"),
  sleep(2000).then(()=>"slept 2000"),
];
for await (const result of AwaitAsTheyCome(sleep_promises)) {
 console.log("promise resolved: ",result);
}

Is there a simpler way to implement the async generator function AwaitAsTheyCome?

(I tried making a stacksnippet out of the above code, but it didn't work-- I suspect this is because the snippets system doesn't understand the new async generator and/or for await..of syntax)

Bergi
  • 630,263
  • 148
  • 957
  • 1,375
Don Hatch
  • 5,041
  • 3
  • 31
  • 48
  • i think you will find [this Q&A](https://stackoverflow.com/a/67628301/633183) to be helfpul. if you have any questions lmk – Mulan Nov 20 '21 at 16:01
  • After a couple of smart people answered a different question from the one I intended to ask, I rewrote the introductory part to try to make it clearer that I'm asking, specifically, for an implementation of an async generator function that can be used with the "for await..of" syntax sugar, rather than a "then"-callback based idiom. – Don Hatch Nov 22 '21 at 00:41
  • Hi @Mulan , thanks, your post that you referred to is an worthwhile read, since I'm also interested in throttling and doing other things with sequences of promises. However, I don't see anything in that post that leads towards the desired simpler implementation of the async generator function AwaitAsTheyCome() that I'm asking for; do you? (Note that I just now rewrote the intro part of my question, to try to make it clearer exactly what I'm looking for, in case it wasn't clear.) – Don Hatch Nov 22 '21 at 00:51
  • "*Suppose I want to, or need to, express the processing described above as an "async for..of" loop*" - why would you want to do that? – Bergi Nov 22 '21 at 01:02
  • How do you want to handle errors? – Bergi Nov 22 '21 at 01:13
  • @Bergi Regarding your "why would I want to do this" question: for all the reasons the async/await and async for..of syntax sugar exists in the first place: conciseness, readability, ease of composition with subsequence processing passes which would result in another async iterable, which would be similarly concise and readable. (Actually I'm not claiming the syntax sugar is all that; but it's what the advocacy articles seem to say, so currently I'm just trying to learn how to rewrite "then"-callback based code in terms of the syntax sugar, more than anything else.) – Don Hatch Nov 22 '21 at 01:16
  • @Bergi I've left out any mention of errors, for simplicity (which may or may not be a good idea, I know). But, I think errors would be handled by putting a try/catch around the "async for..of"; that's how it's done, right? – Don Hatch Nov 22 '21 at 01:18
  • The problem with `for await … of` is that it is doing everything sequentially. Which is not what you want here, if you want the requests (and esp. their results) be processed in parallel. Also if you start all the requests at once, you have no control of backpressure, which is the only thing asynchronous iterators excel at (imo). – Bergi Nov 22 '21 at 01:25
  • @Bergi I don't follow your first three sentences, about parallel vs sequential-- it looks to me like my AwaitAsTheyCome is exactly as sequential or parallel as the "then"-callback based solution. That said, I agree both versions of the code are very naive, and don't handle backpressure-- the fact that I'm starting with an array of already-in-flight promises is already a bad sign saying backpressure isn't being handled (I think that's what you're saying). – Don Hatch Nov 22 '21 at 01:47
  • I'm thinking of doing `for await (const response of AwaitAsTheyCome(fetch_promises)) { const results = await response.text(); console.log(result); }` where the response bodies would be read one after the other. And error handling (if implemented at all, like in the last snippet in my answer) would be deferred until the respective response is reached in the async iterable, instead of immediately throwing. So I consider `Promise.all(fetch_promises.map(p => p.then(async response => console.log(await response.text()))))` to be much better. – Bergi Nov 22 '21 at 04:17

3 Answers3

5

You can simplify the code a bit by

  • using only a single loop over the input array (although that may be confusing)
  • not using a responses array but simply fulfilling the promises
  • not using .shift() on the promises array but simply looping it
async function* raceAll(input) {
  const promises = [];
  const resolvers = [];
  for (const p of input) {
    promises.push(new Promise(resolve=> {
      resolvers.push(resolve);
    }));
    p.then(result => {
      resolvers.shift()(result);
    });
  }

  for (const promise of promises) {
    yield promise;
  }
}

If you don't like the amount of code required, I would recommend to factor out the queue this implements in a separate module. With e.g. this implementation, the code can become as simple as

function raceAll(promises) {
  const queue = new AsyncBlockingQueue();
  for (const p of promises) {
    p.then(result => {
      queue.enqueue(result); 
    });
  }
  return queue[Symbol.asyncIterator]();
}

However, both of these implementation miss a crucial issue: error handling. If any of these promises rejects, you'll get an unhandled rejection error which may crash your process. To actually get the async iterator to reject the next promise, so that a try/catch around a for await…of loop may handle it, you'd need to do something like

async function* raceAll(input) {
  const promises = [];
  const resolvers = [];
  for (const p of input) {
    promises.push(new Promise(resolve => {
      resolvers.push(resolve);
    }));
    p.finally(() => {
      resolvers.shift()(p);
    });
    // works equivalent to:
    // p.then(result => {
    //   resolvers.shift()(result);
    // }, error => {
    //   resolvers.shift()(Promise.reject(error));
    // });
  }

  for (const promise of promises) {
    yield promise;
  }
}

Resolving the promise with a rejected promise does the trick so that we still only need one queue of resolver functions, not one containing both resolve and reject functions.

Bergi
  • 630,263
  • 148
  • 957
  • 1,375
  • Nice simplifications! Yes, these are the kinds of things I was looking for. Regarding using a single loop over the input array-- yeah I agree that one is a tradeoff, the only reason I had that separated out into a separate loop is because I found it a little easier to read. And, I see the auxiliary arrays of resolvers is already an idiom that you've been using. So I suspect this may be optimal, and I'll accept this answer. – Don Hatch Nov 22 '21 at 01:58
  • I am still wondering, though, if there may be a way to do this without the aux arrays, something roughly as simple as my "The following attempted solution obviously doesn't work" code, since that's about how simple the problem seems to be, conceptually. Do you have a feeling for whether that might be possible or impossible? – Don Hatch Nov 22 '21 at 02:01
  • The queue approach is certainly the most efficient. (Maybe even more if promises and resolver functions are generated on demand and not kept until the end of the iteration, like that `AsyncBlockingQueue` class does it). Another approach without the arrays might be something along the lines of `for (const p of promises) p.then(() => promises.splice(promises.indexOf(p), 1)); while (promises.length) yield Promise.race(promises);` but I don't like that for the many stale `.then` handlers the repeated `Promise.race` leaves around. – Bergi Nov 22 '21 at 03:54
  • When I try your short implementation using AsyncBlockingQueue, I get "Uncaught TypeError: raceAll(...) is not a function or its return value is not async iterable". Not sure what went wrong, but the "Symbol.asyncIterator" thing seems unnecessarily obscure anyway, so how about this instead: `async function* raceAll(promises) { ...; yield* queue; }` . (And I think the `for` loops at the ends of your other two implementations can be replaced by `yield* promises;` as well) – Don Hatch Nov 22 '21 at 15:13
  • @DonHatch Ah, yes, `return queue` or `yield* queue` would work fine; I've also fixed the [`AsyncBlockingQueue` implementation](https://stackoverflow.com/a/47157577/1048572) to have an iterable iterator. – Bergi Nov 22 '21 at 15:34
0

You could work with Promise.race to get a promise for the next resolving one, then remove that promise from the list and repeat.

async function* raceAll(promises) {
    // Don't mutate original array, and have Promise.race work with the
    // chained promises, so that if there is a rejection, the caller's 
    // error handler will stop a rejection to bubble up unhandled.
    promises = promises.map(p => p = p.then(val => {
        promises.splice(promises.indexOf(p), 1);
        return val;
    }));
    while (promises.length) yield Promise.race(promises);
}

// Demo
const delay = (ms, val, err) => new Promise((resolve, reject) => 
    setTimeout(() => err ? reject(err) : resolve(val), ms)
);

(async function main() {
    const promises = [
        delay(200, 200),
        delay(500, 500), // Add third argument to trigger rejection
        delay(100, 100),
        delay(400, 400)
    ];
    try {
        for await (const val of raceAll(promises)) {
            console.log(val);
        }
    } catch(e) {
        console.log("caught", e);
    }
})();
trincot
  • 317,000
  • 35
  • 244
  • 286
-1

This does the same thing where whatever code you would put in your loop handling the yield results goes in the onfulfilled callback. The Promise.all() waits for all promises to finish like your loop.

const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
    sleep(3000).then(()=>"slept 3000"),
    sleep(1000).then(()=>"slept 1000"),
    sleep(5000).then(()=>"slept 5000"),
    sleep(4000).then(()=>"slept 4000"),
    sleep(2000).then(()=>"slept 2000"),
];

const onfulfilled = result => console.log("promise resolved: ",result);
sleep_promises.forEach( p => p.then(onfulfilled) )
await Promise.all(sleep_promises)
possum
  • 1,837
  • 3
  • 9
  • 18
  • Thanks, I know I can get the desired example output by attaching the processing function as a "then" callback to each promise as you've done here; that's not hard. What I'm looking for is a way to express the results as an async iterable, instead, so that it can be processed (and then composed with subsequent processing) using the concise `for await...of` syntax sugar instead of "then" callbacks. Therefore what I'm specifically asking is how to implement the async generator function that does this. – Don Hatch Nov 20 '21 at 13:35
  • Is this out of some exercise where you really just want to use a generator or is there a case outside of what you originally posted that the generator eventually gives you something more useful? – possum Nov 20 '21 at 13:59
  • I guess you could say it's an exercise, for my education in how to do this sort of thing. I haven't used async/await before much, so I'm trying to learn how to use it and how to restructure "then"-callback and "catch"-callback based code in terms of async/await syntax sugar and try/catch blocks. I have no opinion on whether the syntax sugar is worthwhile, at this point I'm trying to learn it so that I can assess that. – Don Hatch Nov 20 '21 at 14:07
  • And, yes, I have a larger use case where I'm actually using this, so I've tried to distill it down into a clear relevant example for this question. In general, an async iterable seems like a tidier package than just a collection of promises that I can attach individual "then" callbacks to; that seems evident from just the example I've given, so my impression is that it wouldn't be helpful for me to post more details about my application. – Don Hatch Nov 20 '21 at 14:20