1

There's an async method in some external library, which is not thread-safe (corrupts internal state when being called concurrently) and is very unlikely to be ever fixed in the near future. Yet I want the rest of my codebase to freely use it in any possible parallel vs. sequential combinations (in other words, I want my methods that use that method to still be thread-safe).

So I need to wrap that problematic method with some Active Object implementation, so that its invocations are always aligned (executed one after another).

My take on this is to use a chain of Promises:

var workQueue: Promise<any> = Promise.resolve();

function executeSequentially<T>(action: () => Promise<T>): Promise<T> {

    return new Promise((resolve, reject) => {

        workQueue = workQueue.then(() => {
            return action().then(resolve, reject);
        });
    });
}

And then instead of directly calling that method:

const myPromise = asyncThreadUnsafeMethod(123);

I'll be calling it like this:

const myPromise = executeSequentially(() => asyncThreadUnsafeMethod(123));

But am I missing something? Any potential problems with this approach? Any better ideas?

scale_tone
  • 232
  • 2
  • 11
  • Is there the chance of having multiple servers running the same code, like behind a load balancer or using pm2? If there is this chance you'll need some kind of redis setup to mark the troubling process as in progress across servers – spirift Dec 12 '21 at 17:37
  • 1
    I would be looking at some kind of mutex implementation vs. what you're trying here. That (as noted by @spirift) would only help if localized to this one server; if it's not multiprocess safe either then you have a bigger problem – Joe Dec 12 '21 at 17:38
  • 1
    @Joe, the promise returned by executeSequentially() will only be resolved when the passed action completes (or fails). Could be expressed simpler: `function executeSequentially(action: () => Promise): Promise { workQueue = workQueue.then(() => action()); return workQueue; } `, but this version would capture the very first exception inside action and stick with it. That's why I'm making a new Promise() out there. – scale_tone Dec 12 '21 at 17:53
  • @spirift, it is an absolutely single-server issue. No need to think of any distributed locks. – scale_tone Dec 12 '21 at 17:54
  • @Joe "*Your `workQueue` promise is always resolved*" - no it's not? `action()` is presumed to return a promise that is not immediately resolved. – Bergi Dec 12 '21 at 18:39
  • @scale_tone If the concept of a mutex seems unfamiliar to you, think about a FIFO-ordered queue for promises. There are many OSS libraries which implement it already. Then just wrap the problem method in a function that proxies invocations to the queue. – jsejcksn Dec 12 '21 at 18:40
  • 1
    @jsejcksn Why pull in an extra library when the OP already has implemented the FIFO-ordered queue for promises in 5 lines of code? – Bergi Dec 12 '21 at 18:58
  • I’m dumb and missed the workqueue assignment. – Joe Dec 12 '21 at 19:07
  • @Bergi What if the library is only 2 lines? – jsejcksn Dec 12 '21 at 19:23
  • @jsejcksn Then it's hardly worth managing a dependency for that. – Bergi Dec 12 '21 at 19:41

1 Answers1

3

There are no problems with this approach, using a promise queue is a standard pattern that will solve your problem.

While your code does work fine, I would recommend to avoid the Promise constructor antipattern:

let workQueue: Promise<void> = Promise.resolve();
const ignore = _ => {};
function executeSequentially<T>(action: () => Promise<T>): Promise<T> {
    const result = workQueue.then(action);
    workQueue = result.then(ignore, ignore);
    return result;
}

Also you say that you'd be "calling the method" like executeSequentially(() => asyncThreadUnsafeMethod(123));, but this does leave the potential for mistakes where you forget the executeSequentially wrapper and still call the unsafe method directly. Instead I'd recommend to introduce an indirection where you wrap the entire safe call in a function and then export only that, having your codebase only interact with that facade and never with the library itself. To help with the creation of such a facade, I'd curry the executeSequentially function:

const ignore = _ => {};
function makeSequential<T, A extends unknown[]>(fn: (...args: A) => Promise<T>): (...args: A) => Promise<T> {
    let workQueue: Promise<void> = Promise.resolve();
    return (...args) => {
        const result = workQueue.then(() => fn(...args));
        workQueue = result.then(ignore, ignore);
        return result;
    };
}

export const asyncSafeMethod = makeSequential(asyncThreadUnsafeMethod);

or just directly implement it for that one case

const ignore = _ => {};
let workQueue: Promise<void> = Promise.resolve();

export function asyncSafeMethod(x: number) {
    const result = workQueue.then(() => asyncThreadUnsafeMethod(x));
    workQueue = result.then(ignore, ignore);
    return result;
}
import { asyncSafeMethod } from …;

asyncSafeMethod(123);
Anton Strogonoff
  • 32,294
  • 8
  • 53
  • 61
Bergi
  • 630,263
  • 148
  • 957
  • 1,375