0

I'm trying to execute some async tasks in parallel with a limitation on the maximum number of simultaneously running tasks.

There's an example of what I want to achieve:

Task flow example

Currently this tasks are running one after another. It's implemented this way:

export function signData(dataItem) {
  cadesplugin.async_spawn(async function* (args) {
    //... nestedArgs assignment logic ...

    for (const id of dataItem.identifiers) {
      yield* idHandler(dataItem, id, args, nestedArgs);
    }
    
    // some extra logic after all tasks were finished
  }, firstArg, secondArg);
}

async function* idHandler(edsItem, researchId, args, nestedArgs) {
  ...
  let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
  yield oDocumentNameAttr.propset_Value("Document Name");
  ...
  // this function mutates some external data, making API calls and returns void
}

Unfortunately, I can't make any changes in cadesplugin.* functions, but I can use any external libraries (or built-in Promise) in my code.

I found some methods (eachLimit and parallelLimit) in async library that might work for me and an answer that shows how to deal with it.

But there are still two problems I can't solve:

  1. How can I pass main params into nested function?
  2. Main function is a generator function, so I still need to work with yield expressions in main and nested functions

There's a link to cadesplugin.* source code, where you can find async_spawn (and another cadesplugin.*) function that used in my code.

That's the code I tried with no luck:

await forEachLimit(dataItem.identifiers, 5, yield* async function* (researchId, callback) { 
  //... nested function code 
});

It leads to Object is not async iterable error.

Another attempt:

let functionArray = [];
dataItem.identifiers.forEach(researchId => {
  functionArray.push(researchIdHandler(dataItem, id, args, nestedArgs))
});
await parallelLimit(functionArray, 5);

It just does nothing.

Сan I somehow solve this problem, or the generator functions won't allow me to do this?

Alexander Shkirkov
  • 3,527
  • 3
  • 17
  • 37
  • i think you will find [this q&a](https://stackoverflow.com/a/67628301/633183) to be helpful. let me know if you have follow up questions ^_^ – Mulan Dec 24 '21 at 08:03
  • A promise aware map with concurrency option sounds like something you could use, bluebird has a concurrency option, but instead of a large promise lib, something like https://www.npmjs.com/package/promise.map might be ideal. – Keith Dec 24 '21 at 08:11
  • @Mulan, @Keith, thanks for your comments, but unfortunately I'm still not quietly understand how to mix custom `Pool` or bluebird's `Promise.map` with generator functions and yield expressions... My attempts with these solutions still leads to "Object is not async iterable" or "doing nothing" errors. I would be very grateful if you could suggest something about this – Alexander Shkirkov Dec 24 '21 at 09:09
  • @AlexanderShkirkov, I have written a custom eachlimit function, which does the same thing you were asking, have a look and let me know if it is helpful ,https://jsfiddle.net/devnegikec/k3a20qvt/38/ – Devnegikec Oct 09 '22 at 12:24

3 Answers3

1

square peg, round hole

You cannot use async iterables for this problem. It is the nature of for await .. of to run in series. await blocks and the loop will not continue until the awaited promise has resovled. You need a more precise level of control where you can enforce these specific requirements.

To start, we have a mock myJob that simulates a long computation. More than likely this will be a network request to some API in your app -

// any asynchronous task
const myJob = x =>
  sleep(rand(5000)).then(_ => x * 10)

Using Pool defined in this Q&A, we instantiate Pool(size=4) where size is the number of concurrent threads to run -

const pool = new Pool(4)

For ergonomics, I added a run method to the Pool class, making it easier to wrap and run jobs -

class Pool {
  constructor (size) ...
  open () ...
  deferNow () ...
  deferStacked () ...

  // added method
  async run (t) {
    const close = await this.open()
    return t().then(close)
  }
}

Now we need to write an effect that uses our pool to run myJob. Here you will also decide what to do with the result. Note the promise must be wrapped in a thunk otherwise pool cannot control when it begins -

async function myEffect(x) {
  // run the job with the pool
  const r = await pool.run(_ => myJob(x))

  // do something with the result
  const s = document.createTextNode(`${r}\n`)
  document.body.appendChild(s)

  // return a value, if you want
  return r
}

Now run everything by mapping myEffect over your list of inputs. In our example myEffect we return r which means the result is also available after all results are fetched. This optional but demonstrates how program knows when everything is done -

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
  .then(JSON.stringify)
  .then(console.log, console.error)

full program demo

In the functioning demo below, I condensed the definitions so we can see them all at once. Run the program to verify the result in your own browser -

class Pool {
  constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
  open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
  async run (t) { const close = await this.open(); return t().then(close) }
  deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
  deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))

const myJob = x =>
  sleep(rand(5000)).then(_ => x * 10)

async function myEffect(x) {
  const r = await pool.run(_ => myJob(x))
  const s = document.createTextNode(`${r}\n`)
  document.body.appendChild(s)
  return r
}
  
const pool = new Pool(4)

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
  .then(JSON.stringify)
  .then(console.log, console.error)

slow it down

Pool above runs concurrent jobs as quickly as possible. You may also be interested in throttle which is also introduced in the original post. Instead of making Pool more complex, we can wrap our jobs using throttle to give the caller control over the minimum time a job should take -

const throttle = (p, ms) =>
  Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

We can add a throttle in myEffect. Now if myJob runs very quickly, at least 5 seconds will pass before the next job is run -

async function myEffect(x) {
  const r = await pool.run(_ => throttle(myJob(x), 5000))
  const s = document.createTextNode(`${r}\n`)
  document.body.appendChild(s)
  return r
}
Mulan
  • 129,518
  • 31
  • 228
  • 259
  • Thanks for the detailed explanation! :) My main problem is still related to using generator functions in this case, but I guess now I can solve it by myself. I'll accept this answer as a more general solution, and will try to post my own answer related to `cadesplugin.*` generator functions a little bit later – Alexander Shkirkov Dec 25 '21 at 03:29
0

In general, it should be better to apply @Mulan answer.

But if you also stuck into cadesplugin.* generator functions and don't really care about heavyweight external libraries, this answer may also be helpful.

(If you are worried about heavyweight external libraries, you may still mix this answer with @Mulan's one)

Async task running can simply be solved using Promise.map function from bluebird library and double-usage of cadesplugin.async_spawn function.

The code will look like the following:

export function signData(dataItem) {
  cadesplugin.async_spawn(async function* (args) {
    // some extra logic before all of the tasks

    await Promise.map(dataItem.identifiers,
      (id) => cadesplugin.async_spawn(async function* (args) {
        // ...
        let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
        yield oDocumentNameAttr.propset_Value("Document Name");
        // ...
        // this function mutates some external data and making API calls
      }),
      {
        concurrency: 5 //Parallel tasks count
      });
    
    // some extra logic after all tasks were finished
  }, firstArg, secondArg);
}

The magic comes from async_spawn function which is defined as:

function async_spawn(generatorFunction) {
  async function continuer(verb, arg) {
    let result;
    try {
      result = await generator[verb](arg);
    } catch (err) {
      return Promise.reject(err);
    }
    if (result.done) {
      return result.value;
    } else {
      return Promise.resolve(result.value).then(onFulfilled, onRejected);
    }
  }

  let generator = generatorFunction(Array.prototype.slice.call(arguments, 1));
  let onFulfilled = continuer.bind(continuer, "next");
  let onRejected = continuer.bind(continuer, "throw");
  return onFulfilled();
}

It can suspend the execution of internal generator functions on yield expressions without suspending the whole generator function.

Alexander Shkirkov
  • 3,527
  • 3
  • 17
  • 37
0

Here's a very short vanilla JS function mapParallel that will call fn(v, i) for each element of values while making sure that at most max Promises are in flight at each moment:

const mapParallel = async (values, fn, max=10) => {
  const promises = new Set();

  for (const i in values) {
    while (promises.size >= max)
      await Promise.race(promises.values());

    let promise = fn(values[i], i).finally(() => promises.delete(promise));
    promises.add(promise);
  }

  return Promise.all(promises.values());
};

By using an async function together with a regular for loop we can just block the loop using await until a slot has become free before kicking off each new invocation.

Sample usage, running two delays at a time:

mapParallel(
  [5000,2000,1000,3000,4000],
  async (t, i) => {
    console.log('starting', i);
    await new Promise(res => setTimeout(res, t));
    console.log('finished', i);
  },
  2
);

which prints:

starting 0
starting 1
finished 1
starting 2
finished 2
starting 3
finished 0
starting 4
finished 3
finished 4
s-ol
  • 1,674
  • 17
  • 28