I define a function named fn
which I then run in the background as part of a bigger service.
The function fn
retrieves messages from a message queue one by one, and processes each message. Processing of each message can take between 1 and 10 minutes (longProcess()
).
Using the following code, the for await
loop waits for new messages and processes each one, and then fetches a new message once the processing is complete.
const fn = async (subscription: AsyncIterable) => {
subscription.pullOne();
for await (const msg of subscription) {
await longProcess(msg);
subscription.pullOne();
}
subscription.close();
};
fn(subscription).then(() => {});
If I remove the await
from before longProcess(msg)
, messages are sent to be processed as soon as they are received, which is what I want, but I only want a maximum of 5 messages to be processed simultaneously.
I don't want any more messages to be pulled before the current messages are done processing (so that other subscribers may pull and process them).
This question deals with a very similar case, but I can't seem to find a solution that actually works and provides an elegant solution.
I tried using the bottleneck library by defining a concurrency limit, but I can't figure out how to stop the loop from fetching more messages before the active processing is finished.
const limiter = new Bottleneck({
maxConcurrent: 5,
});
const fn = async (subscription: AsyncIterable) => {
subscription.pullOne();
for await (const msg of subscription) {
limiter.schedule(() => longProcess(msg));
subscription.pullOne();
}
subscription.close();
};