0

I am trying to add batching capability to an async js generator. The idea is to have a function that would wrap around a non-batched generator. This function would call the generator's next method several times to launch several async operations concurrently, then it would return the first value taking care to refill the batch object as it returns its items to the client. The following examples demonstrates the working case that doesn't use the wrapper as well as the wrapper case that produces correct results, but doesn't result in desired behavior of concurrent execution of the batched promises.

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

async function foo(v) {
  await sleep(1000);
  return v;
}

const createAsyncGenerator = async function*(){
  for (let i = 0; i < 5000; i++) {
    yield foo(i);
  }
}

const createBatchedAsyncGenerator = async function*(batch_size){
  const batch = [];
  for (let i = 0; i < batch_size; i++) {
    batch.push(foo(i));
  }
  for (let i = batch_size; i < 500; i++) {
    batch.push(foo(i));
    yield batch.shift();
  }
}


function batchAsyncGenerator(generator) {
  return {
    batch: [],
    [Symbol.asyncIterator]() {
      while (this.batch.length < 5) {
        this.batch.push(generator.next());
      }
      return {
        batch: this.batch,
        async next() {
          this.batch.push(generator.next());
          const result = this.batch.shift();
          return result;
        }
      }
    }
  }
}

const batching_works = async () => {
  const asyncGenerator = createBatchedAsyncGenerator(5);
  for await (const item of asyncGenerator) {
    console.log(item)
  }
}

const batching_doesnt_work = async () => {
  const asyncGenerator = batchAsyncGenerator(createAsyncGenerator());
  for await (const item of asyncGenerator) {
    console.log(item)
  }
}

batching_works()
//batching_doesnt_work()

2 Answers2

0

This function would call the generator's next method several times to launch several async operations concurrently

That is unfortunately (?) not how async generators work. If you yield x in an async function*, what is actually happening is

await yield await x

(see steps 5 and 8.b of the AsyncGeneratorYield abstract operation). This means that all actions inside the generator body will be strictly sequential and may not overlap just because the async generator is iterated "faster". Every .next() call you make on an async generator is in fact queued to make awaits inside the generator body possible.

You can achieve the desired behaviour using a synchronous iterator that yields promises, though.

Finally, note a huge drawback of your implementations (both createBatchedAsyncGenerato and batchAsyncGenerator): they don't properly handle errors, and will potentially cause unhandled promise rejections to crash your application. See Waiting for more than one concurrent await operation and Any difference between await Promise.all() and multiple await? for details.

Bergi
  • 630,263
  • 148
  • 957
  • 1,375
  • Bergi, thank you very much for the very well crafted and detailed response. I am getting an impression that async generators are a bit over designed compared to other more "transparent" JS features. I will try a regular iterator as you suggested. – Vassili Gorshkov Jan 18 '22 at 17:46
-1

The batching function could be a generator function itself:

function sleep(ms) {
  return new Promise(resolve => setTimeout(() => resolve(), ms));
}

const foo = async(v) => {
  await sleep(150)
  return v
}

const createAsyncGenerator = async function*() {
  for (let i = 0; i < 50; i++) {
    const r = await foo(i)
    yield r
  }
}

// creating the batches & handling the results
const asyncBatchGenerator = async function*({
  batch,
  fn
}) {
  const a = [...Array(batch)].fill('').map((_) => fn.next())
  const res = await Promise.all(a)
  yield res
}

// wrapper function that handles the different states
// (e.g. asyncBatchGenerator is re-initialized, while fn
// is kept until "closed"
const batchingWrapper = async(fn) => {
  const genFn = fn()
  const bf = () => asyncBatchGenerator({
    batch: 8,
    fn: genFn
  })
  let res = []
  let genClosed = false

  while (!genClosed) {
    for await (let value of bf()) {
      // creating the return array: only items that
      // hold a real value are added to the result
      res = [...res, ...value.filter(({
        done
      }) => !done)]
      // logging the growing return array:
      console.log(res)

      if (value.some(({
          done
        }) => done)) {
        genClosed = true
      }
    }
  }
  console.log('generator closed')
}

// calling the wrapper with the argument:
batchingWrapper(createAsyncGenerator)

The batching generator is re-created on every pass - until the wrapper generator function closes.

muka.gergely
  • 8,063
  • 2
  • 17
  • 34