1

I define a function named fn which I then run in the background as part of a bigger service.

The function fn retrieves messages from a message queue one by one, and processes each message. Processing of each message can take between 1 and 10 minutes (longProcess()).

Using the following code, the for await loop waits for new messages and processes each one, and then fetches a new message once the processing is complete.

const fn = async (subscription: AsyncIterable) => {
    subscription.pullOne();
    for await (const msg of subscription) {
        await longProcess(msg);
        subscription.pullOne();
    }
    subscription.close();
};

fn(subscription).then(() => {});

If I remove the await from before longProcess(msg), messages are sent to be processed as soon as they are received, which is what I want, but I only want a maximum of 5 messages to be processed simultaneously.

I don't want any more messages to be pulled before the current messages are done processing (so that other subscribers may pull and process them).

This question deals with a very similar case, but I can't seem to find a solution that actually works and provides an elegant solution.

I tried using the bottleneck library by defining a concurrency limit, but I can't figure out how to stop the loop from fetching more messages before the active processing is finished.

const limiter = new Bottleneck({
    maxConcurrent: 5,
});

const fn = async (subscription: AsyncIterable) => {
    subscription.pullOne();
    for await (const msg of subscription) {
        limiter.schedule(() => longProcess(msg));
        subscription.pullOne();
    }
    subscription.close();
};
Charles
  • 157
  • 1
  • 10
  • 1
    Maybe it worths splitting `subscription` into several arrays per 5 elements and then smth like `Promise.all` ? – captain-yossarian from Ukraine Jul 14 '21 at 09:15
  • Messages are constantly sent to subscription, I'm not sure if splitting every 5 into a separate array will be efficient. Also, theoretically, I may only receive 1 or 2 messages at a given time, and then I wouldn't even have 5 messages to put aside. I'm more interested in how to limit the `for await` so that more messages aren't pulled until the `limiter` is free to process more messages. – Charles Jul 14 '21 at 09:19

1 Answers1

1

You can try processing them in batches/chunks of <= 5 items at a time

// helper function
async function take(aIterable, count) {
  const res = [];
  let done = false
  for(let i = 0; i < count; i++) {
    const next = await aIterable.next()
    if(!next.done) res.push(next.value)
    done = next.done;
  }
  return [res, done];
}

const fn = async (subscription) => {
    subscription.pullOne();
    let done = false;
    while (!done) {
        let [batch, _done] = await take(subscription, 5);
        done = _done;
        await Promise.allSettled(batch.map(msg => longProcess(msg)));
        // subscription.pullOne();
    }
    subscription.close();
};
Ikechukwu Eze
  • 2,703
  • 1
  • 13
  • 18
  • Will this work if `subscription` is an `AsyncIterable`? – Charles Jul 14 '21 at 09:39
  • It wouldn't work but can be made to work with some changes. is subscription AsyncIterable? – Ikechukwu Eze Jul 14 '21 at 09:46
  • @Charles pullOne and close doesn't exit on AsyncIterable. But I have edited this to work with normal AsyncIterable. If you are using a custom library for it, then decide where to call pullOne, I think close is in the right place, no? – Ikechukwu Eze Jul 14 '21 at 10:44
  • Thanks for taking the time. Where is `msg` in all of this? I still need to call `longProcess` with `msg` as a parameter, a possibly other parameters going forward. Also, my IDE shows an error for `map`: `Property 'map' does not exist on type 'Promise[]>'.` – Charles Jul 14 '21 at 13:14
  • Sorry, the `map` is on`batch`. I have also explicitly specified the msg. (Updated the answer) Let me know if this doesn't work. – Ikechukwu Eze Jul 14 '21 at 16:37
  • I made some small modifications to fit this to my code (e.g. `take` returns an object because an array makes less sense to me), but it seems to get stuck inside `take` in the `for`'s second iteration. So likely getting stuck on `const next = await aIterable[Symbol.asyncIterator]().next()`. If I undersstand correctly this requires at least 5 messages to come through before processing even begins. I think I may give up on this altogether because it's becoming a real pain getting this to work. This would have been much more straightforward to implement using goroutines and channels in Go. – Charles Jul 15 '21 at 11:16