1

How to convert a dynamic Set<Promise<T>> into AsyncIterable<T> (unordered)?

The resulting iterable must produce values as they get resolved, and it must end just as the source runs empty.

I have a dynamic cache of promises to be resolved, and values reported, disregarding the order.

NOTE: The source is dynamic, which means it can receive new Promise<T> elements while we progress through the resulting iterator.

UPDATE

After going through all the suggestions, I was able to implement my operator. And here're the official docs.

I'm adding a bounty to reward anyone who can improve it further, though at this point a PR is preferable (it is for a public library), or at least something that fits the same protocol.

vitaly-t
  • 24,279
  • 15
  • 116
  • 138
  • Comments are not for extended discussion; this conversation has been [moved to chat](https://chat.stackoverflow.com/rooms/249284/discussion-on-question-by-vitaly-t-unordered-resolution-of-a-list-of-promises). – Samuel Liew Nov 03 '22 at 13:01

2 Answers2

2

Judging from your library implementation, you actually want to transform an AsyncIterable<Promise<T>> into an AsyncIterator<T> by racing up to N of the produced promises concurrently. I would implement that as follows:

async function* limitConcurrent<T>(iterable: AsyncIterable<Promise<T>>, n: number): AsyncIterator<T> {
  const pool = new Set();
  for await (const p of iterable) {
    const promise = Promise.resolve(p).finally(() => {
      pool.delete(promise); // FIXME see below
    });
    promise.catch(() => { /* ignore */ }); // mark rejections as handled
    pool.add(promise);
    if (pool.size >= n) {
      yield /* await */ Promise.race(pool);
    }
  }
  while (pool.size) {
    yield /* await */ Promise.race(pool);
  }
}

Notice that if one of the promises in the pool rejects, the returned iterator will end with the error and the results of the other promises that are currently in the pool will be ignored.


However, above implementation presumes that the iterable is relatively fast, as it will need to produce n promises before the pool is raced for the first time. If it yields the promises slower than the promises take to resolve, the results are held up unnecessarily.

And worse, the above implementation may loose values. If the returned iterator is not consumed fast enough, or the iterable is not yielding fast enough, multiple promise handlers may delete their respective promise from the pool during one iteration of the loop, and the Promise.race will consider only one of them.

So this would work for a synchronous iterable, but if you actually have an asynchronous iterable, you would need a different solution. Essentially you got a consumer and a producer that are more or less independent, and what you need is some queue between them.

Yet with a single queue it still wouldn't handle backpressure, the producer just runs as fast as it can (given the iteration of promises and the concurrency limit) while filling the queue. What you really need then is a channel that allows synchronisation in both directions, e.g. using two queues:

class AsyncQueue<T> {
  resolvers: null | ((res: IteratorResult<T> | Promise<never>) => void)[];
  promises: Promise<IteratorResult<T>>[];
  constructor() {
    // invariant: at least one of the arrays is empty.
    // when `resolvers` is `null`, the queue has ended.
    this.resolvers = [];
    this.promises = [];
  }
  putNext(result: IteratorResult<T> | Promise<never>): void {
    if (!this.resolvers) throw new Error('Queue already ended');
    if (this.resolvers.length) this.resolvers.shift()(result);
    else this.promises.push(Promise.resolve(result));
  }
  put(value: T): void {
    this.putNext({done: false, value});
  }
  end(): void {
    for (const res of this.resolvers) res({done: true, value: undefined});
    this.resolvers = null;
  } 
  next(): Promise<IteratorResult<T>> {
    if (this.promises.length) return this.promises.shift();
    else if (this.resolvers) return new Promise(resolve => { this.resolvers.push(resolve); });
    else return Promise.resolve({done: true, value: undefined});
  }
  [Symbol.asyncIterator](): AsyncIterator<T> {
    // Todo: Use AsyncIterator.from()
    return this;
  }
}
function limitConcurrent<T>(iterable: AsyncIterable<Promise<T>>, n: number): AsyncIterator<T> {
  const produced = new AsyncQueue<T>();
  const consumed = new AsyncQueue<void>();
  (async () => {
    try {
      let count = 0;
      for await (const p of iterable) {
        const promise = Promise.resolve(p);
        promise.then(value => {
          produced.put(value);
        }, _err => {
          produced.putNext(promise); // with rejection already marked as handled
        });
        if (++count >= n) {
          await consumed.next(); // happens after any produced.put[Next]()
          count--;
        }
      }
      while (count) {
        await consumed.next(); // happens after any produced.put[Next]()
        count--;
      }
    } catch(e) {
      // ignore `iterable` errors?
    } finally {
      produced.end();
    }
  })();

  return (async function*() {
    for await (const value of produced) {
      yield value;
      consumed.put();
    }
  }());
}
Bergi
  • 630,263
  • 148
  • 957
  • 1,375
  • I appreciate your help with that, but at this point, now that I got [my own code](https://github.com/vitaly-t/iter-ops/blob/main/src/ops/async/wait-race.ts) performing exactly as needed, I would only accept refactoring PR-s. I probably should close the question or add my own answer? – vitaly-t Nov 04 '22 at 23:38
  • @vitaly-t It's not as simple as one thinks :-) Your code has a problem when the iterable is slower than the resolution of the promises it yields - see the first point of my extended answer. I figured my simple answer (first snippet) is actually wrong… – Bergi Nov 05 '22 at 02:10
  • I'm not sure how that problem can be tested. The main logic sits behind `return i.next().then((a)`, which means speed of the source is irrelevant. Can you offer a sequence example that would show the problem, please? – vitaly-t Nov 05 '22 at 13:48
  • I'm following up with an update in the answer + bounty for yourself, if you help me further resolving the issue in my code (note that I do not see one yet). – vitaly-t Nov 05 '22 at 13:56
  • [Here's complete test with delayed values](https://gist.github.com/vitaly-t/de30ca8676570570a8577ce9313bf968) and I do not see any issue. If you can create an example that shows the problem - please let me know! – vitaly-t Nov 05 '22 at 14:26
  • This sounds like something that [Repeater.js](https://repeater.js.org) could help with. Synchronizing async stuff was one of the example use cases. It lets you implement async iterators using a Promise-style constructor. – Darryl Noakes Nov 05 '22 at 14:43
  • @vitaly-t I've [replied on your github discussion](https://github.com/vitaly-t/iter-ops/discussions/179#discussioncomment-4063894), the example is too long for a comment and would derail the SO answer – Bergi Nov 05 '22 at 15:05
  • @DarrylNoakes [That sounds great!](https://repeater.js.org/docs/safety) Could you write up an answer based on that, please? The hard part (imo) is to only sometimes wait for the `push()` promise depending on how many concurrent executions are active. – Bergi Nov 05 '22 at 15:27
  • @Bergi, I've played around with it and couldn't get anything very satisfactory. My attempt is on StackBlitz: [`waitRace` Experiments - Darryl Noakes - StackBlitz](https://stackblitz.com/edit/node-akcz6d?file=index.ts&view=editor). – Darryl Noakes Nov 07 '22 at 23:49
  • 1
    The solution suggested here [found its place inside iter-ops library](https://github.com/vitaly-t/iter-ops/blob/main/src/ops/async/wait-race.ts). Thank you! – vitaly-t Nov 08 '22 at 05:47
1

function createCache() {
  const resolve = [];
  const sortedPromises = [];
  const noop = () => void 0;

  return {
    get length() {
      return sortedPromises.length
    },

    add(promiseOrValue) {
      const q = new Promise(r => {
        resolve.push(r);

        const _ = () => {
          resolve.shift()(promiseOrValue);
        }

        Promise.resolve(promiseOrValue).then(_, _);
      });

      q.catch(noop); // prevent q from throwing when rejected.

      sortedPromises.push(q);
    },
    
    next() {
      return sortedPromises.length ?
        { value: sortedPromises.shift() } :
        { done: true };
    },

    [Symbol.iterator]() {
      return this;
    }
  }
}

(async() => {
  const sleep = (ms, value) => new Promise(resolve => setTimeout(resolve, ms, value));
  const cache = createCache();
  const start = Date.now();

  function addItem() {
    const t = Math.floor(Math.random() ** 2 * 8000), // when to resolve
      val = t + Date.now() - start; // ensure that the resolved value is in ASC order.

    console.log("add", val);
    cache.add(sleep(t, val));
  }
  
  // add a few initial items
  Array(5).fill().forEach(addItem);
  
  // check error handling with a rejecting promise.
  cache.add(sleep(1500).then(() => Promise.reject("a rejected Promise")));
  
  while (cache.length) {
    try {
      for await (let v of cache) {
        console.log("yield", v);

        if (v < 15000 && Math.random() < .5) {
          addItem();
        }

        // slow down iteration, like if you'd await some API-call.
        // promises now resolve faster than we pull them.
        await sleep(1000);
      }
    } catch (err) {
      console.log("error:", err);
    }
  }
  console.log("done");
})()
.as-console-wrapper{top:0;max-height:100%!important}

works with both for(const promise of cache){ ... } and for await(const value of cache){ ... }

Error-handling:

for(const promise of cache){
  try {
    const value = await promise;
  }catch(error){ ... }
}

// or

while(cache.length){
  try {
    for await(const value of cache){
      ...
    }
  }catch(error){ ... }
}

rejected Promises (in the cache) don't throw until you .then() or await them.

Also handles backpressure (when your loop is iterating slower than the promises resolve)

for await(const value of cache){
  await somethingSlow(value);
}
Thomas
  • 11,958
  • 1
  • 14
  • 23
  • 1
    I don't think this meets OP's requirements since `promises` can be appended to while you are in the while loop. – kelsny Nov 02 '22 at 19:18
  • Any idea how I can amend [my own implementation](https://github.com/vitaly-t/iter-ops/blob/wait-cache/src/ops/async/wait-cache.ts#L16) accordingly? My concern is for poor performance, possibly due to slow `Promise.race` usage or something else? – vitaly-t Nov 03 '22 at 18:13
  • @vitaly-t Barmar was right, this is an X Y problem. All the fuzz about the cache is just an implementation detail. Cutting to the end, `waitCacheAsync` can not work as expected with a generic `AsyncIterator` as input. Your utilities are "manually" implementing `next`, I'd have yet to test them, but if your `asyncIterator` is an `async function *` you have 0 chance of resolving the 2nd Promise before the 1st. – Thomas Nov 04 '22 at 09:48
  • Well, all my tests of [the current implementation](https://github.com/vitaly-t/iter-ops/blob/main/src/ops/async/wait-race.ts) show perfect concurrency, and I'm happy with it. – vitaly-t Nov 04 '22 at 12:51