0

Executing the code below randomly either prints "DONE" or not, why does it happen? How do I make it go to the console.log("DONE"); line every time.

const {Worker, isMainThread, parentPort} = require('node:worker_threads');

async function main() {
  if (isMainThread) {
    const worker = new Worker(__filename);
    let resultResolve = null;
    let resultPromise = new Promise(resolve => resultResolve = resolve);
    worker.on('message', (msg) => resultResolve(msg));
    while (await resultPromise != null) {
      resultPromise = new Promise(resolve => resultResolve = resolve);
    }
    console.log("DONE");
  } else {
    for (let i = 0; i < 10000; i++) {
      parentPort.postMessage(i);
    }
    parentPort.postMessage(null);
  }
}

main();

My guess is that the race happens when the worker exits and either the main thread event loop makes it to the await resultPromise line before that or not.

UPDATE 1

I am trying to have an async generator yielding values produced by a thread worker.

A more meaningful example would be:

const {Worker, isMainThread, parentPort} = require('node:worker_threads');

async function* fooGenerator() {
  const worker = new Worker(__filename);
  let resultResolve = null;
  let resultPromise = new Promise(resolve => resultResolve = resolve);
  worker.on('message', (msg) => resultResolve(msg));
  let result = null;
  while ((result = await resultPromise) != null) {
    resultPromise = new Promise(resolve => resultResolve = resolve);
    yield result;
  }
}

async function main() {
  if (isMainThread) {
    for await (let value of fooGenerator());
    console.log("DONE");
  } else {
    for (let i = 0; i < 10000; i++) {
      parentPort.postMessage(i);
    }
    parentPort.postMessage(null);
  }
}

main();

UPDATE 2

Adding setInterval doesn't solve the problem. It still won't print "DONE".

const {Worker, isMainThread, parentPort} = require('node:worker_threads');

async function main() {
  if (isMainThread) {
    setInterval(() => {}, 1000);
    const worker = new Worker(__filename);
    let resultResolve = null;
    let resultPromise = new Promise(resolve => resultResolve = resolve);
    worker.on('message', (msg) => resultResolve(msg));
    while ((await resultPromise) != null) {
      resultPromise = new Promise(resolve => resultResolve = resolve);
    }
    console.log("DONE");
  } else {
    for (let i = 0; i < 10000; i++) {
      parentPort.postMessage(i);
    }
    parentPort.postMessage(null);
  }
}

main();
Marcin Król
  • 1,555
  • 2
  • 16
  • 31
  • What is the point of this code? I can't actually see what it's trying to accomplish? Or said another way, what is it that you want it to wait to complete before it exits? – jfriend00 Apr 14 '23 at 20:39
  • And, why the funky `while` loop? What is its purpose? – jfriend00 Apr 14 '23 at 20:41
  • @jfriend00 I updated the post with a more meaningful example. – Marcin Król Apr 14 '23 at 20:56
  • Is there real code in the workers (with open timers or sockets or file operations in process)? Nodejs will exit when there's nothing else that can run. It won't wait for promises if there's nothing else actually running. That `while` loop is still really, really funky. Definitely a better way to do that. – jfriend00 Apr 14 '23 at 21:06
  • The worker is reading data from files. The way I understand it node doesn't need to wait for anything. When the last message is sent, the resultResolve is called with null which should allow node to continue execution at the `await resultPromise` line, which should finish the `for await` iteration and proceed to printing DONE. – Marcin Król Apr 14 '23 at 21:12
  • If you do an `await` and there's nothing still "open" that would normally keep nodejs running (socket, file handle, timer, etc...) because all the workers are done (even if you haven't processed their results), then nodejs will exit. Even if there's still more already resolved promises waiting to execute their resolve handlers (which is what you are relying on). It's just the way nodejs is coded (that logic predates promises). So, you're going to have to either code this differently or deploy a work-around. – jfriend00 Apr 14 '23 at 21:19
  • A common work-around is to start up a `setInterval()` timer with an empty callback that will keep nodejs from exiting. Then, when you know you are actually done, you either `clearInterval()` on the interval timer to let nodejs know it's now OK to exit or you just call `process.exit()`. There is probably a cleaner design that doesn't have this problem, but I can't get my head around what that `while` loop is attempting to do to really understand this code (that also might be a warning that you should find a less confusing way to write this). – jfriend00 Apr 14 '23 at 21:20
  • @jfriend00 I have tried adding setInterval, it keeps node from exiting but it still doesn't get to the DONE printing line. As in the UPDATE 1 example, the while loop is used to wait for the next worker thread message and yield it. I can't think of any other way of converting worker thread messages to an async generator. – Marcin Król Apr 14 '23 at 21:50

2 Answers2

1

I suspect the race condition is rather between

worker.on('message', (msg) => resultResolve(msg));

calling resultResolve and

let resultPromise = new Promise(resolve => resultResolve = resolve);

assigning a new resultResolve. There might be multiple message events happening before you assign a new resolver function to resultResolve, which leads to skipped messages. To confirm this suspicion, use

const {Worker, isMainThread, parentPort} = require('node:worker_threads');

async function main() {
  if (isMainThread) {
    const worker = new Worker(__filename);
    let resultResolve = null;
    worker.on('message', (msg) => {
      if (resultResolve) resultResolve(msg);
      else console.log(`unhandled ${msg}`);
      resultResolve = null;
    });
    let i = 0;
    while (true) {
      const result = await new Promise(resolve => {
        resultResolve = resolve;
      });
      if (result == null) break;
      if (result != i) console.log(`missed ${i}`);
      i = result + 1;
    }
    console.log("DONE");
  } else {
    for (let i = 0; i < 10000; i++) {
      parentPort.postMessage(i);
    }
    parentPort.postMessage(null);
  }
}

main();

This will definitely happen when your fooGenerator() experiences backpressure, e.g. in

for await (const value of fooGenerator()) {
  await delay(50);
}

What you need to solve this problem is a proper queue such as this one. You'd then write

const {Worker, isMainThread, parentPort} = require('node:worker_threads');
    
if (isMainThread) {
  (async function main() {
    const worker = new Worker(__filename);
    const queue = new AsyncBlockingQueue();
    worker.on('message', msg => queue.enqueue(msg));
    for await (const value of queue) {}
    console.log("DONE");
  })();
} else {
  for (let i = 0; i < 10000; i++) {
    parentPort.postMessage(i);
  }
  parentPort.postMessage(null);
}
Bergi
  • 630,263
  • 148
  • 957
  • 1,375
1

Your code relies on some critical timing that apparently is not always the way you think it is.

With this:

 worker.on('message', (msg) => resultResolve(msg));

You are apparently assuming that each time this event handler gets called, it will be a new and different resultResolve() function that will trigger yet a different promise to resolve. But, if you instrument your code and check to see that this is actually a new and different promise, you will find that it is not. So, you have a race condition between the worker message and the code that assigns a new value to resultResolve, thus you're reusing resultResolve values and not resolving all your promises.

So, the Worker finishes its work, communicates all the results back to the parent, calls all the resultResolve() functions, but because you reuse some of the resultResolve values, you don't resolve all the promise and thus your generator gets stuck, but nodejs has no other work to do. The program exits without getting to the "DONE" message.


I'd rather implement this code without a couple things. First, I have a strong distaste for manually created promises that assign the resolve, reject functions outside the executor. There's a reason the promise designers didn't expose them. It can be messy to do so and that's part of the cause of the problem here. If you absolutely must use that type of construct, then I encapsulate that logic inside a Deferred class and create an explicit class that does this for you. But, you usually don't need to do it that way at all.

Second, you have to completely get rid of the race condition. In my mind that's easiest to do by not to the resultResolve thing at all. If the worker.on('message', ...) code has just one job and that's to grab the data it was sent and store it and keep track of when there is no more data, then you can then make another piece of code that lets some other code iterate that data, waiting when it needs to and being told its done when it's done.

You can pick your desired architecture and put iterating the queue behind a generator, an async iterator or a class/method approach.

I would implement this with a queue. There are plenty of already built queuing classes to use, but here's a simple home-grown one that works here:

const { Worker, isMainThread, parentPort } = require('node:worker_threads');
const { EventEmitter, once } = require('node:events');

class Queue extends EventEmitter {
    constructor() {
        super();
        this.data = [];
        this.complete = false;
    }
    async next() {
        if (this.data.length) {
            return this.data.shift();
        } else if (this.complete) {
            // return sentinel value that we're done
            return null;
        } else {
            // wait for next Queue event to happen
            // either an item to be added or a done signal
            await once(this, 'data');
            return this.next();
        }
    }
    done() {
        this.complete = true;
        this.emit('data');
    }
    add(data) {
        this.data.push(data);
        // notify watchers that there is now some data
        this.emit('data');
    }
}

async function main() {
    if (isMainThread) {
        const worker = new Worker(__filename);
        let q = new Queue();
        worker.on('message', (msg) => {
            if (msg === null) {
                q.done();
            } else {
                q.add(msg);
            }
        });
        let cntr = 0;
        while (true) {
            const val = await q.next();
            if (val === null) {
                break;
            }
            ++cntr;
        }
        console.log(`Done: got ${cntr} results`);
    } else {
        // this is what the Worker is doing
        for (let i = 0; i < 10000; i++) {
            parentPort.postMessage(i);
        }
        parentPort.postMessage(null);
    }
}

main();

If you really want to use for await ..., then you could put an asyncIterator on the Queue or convert it to a generator.

Also, note that there's not a single manually created promise here. It uses a trick I've used several times before where it awaits an event and that allows you to await something that some other agent will trigger more cleanly (at least in my opinion).

I added a counter to make sure that all the Worker messages were being processed.

jfriend00
  • 683,504
  • 96
  • 985
  • 979
  • 1
    How could that be a different `resolvePromise`? The task that calls the message event handler will force a microtask checkpoint at the end (in what is called "clean up after running script" in HTML, not sure about node's nomenclature), and thus it's sure the now resolved Promise's callbacks will get called before the next message event task is executed. – Kaiido Apr 15 '23 at 03:33
  • @Kaiido - I'm not sure of the exact reason why - I just know that it does happen if you instrument the code and check. It does not happen for small loop values (1000 or below), but it does happen with the 10000 number in the OP's code. Either way, the code is depending upon certain timing things remaining true (relative timing of different types of events) rather than just coding something that has no such dependencies and is thus more foolproof. – jfriend00 Apr 15 '23 at 03:37
  • To be clear, I'm not the OP. I'm just wondering as they do "why" it happens. Clearly sounds like a bug. – Kaiido Apr 15 '23 at 03:38
  • @Kaiido - I would guess that it has something to do with the messaging queue between Worker and Main thread being blasted by the `for` loop. I found the OP's code very difficult to understand becomes of these implicit assumptions about execution order of different kinds of events. To me, it's just poor code. If you expect a specific execution order, it is much safer and easier to maintain if you just write the code to require that order. Even if this code did work, this code could be a pain to maintain as one extra `await` thrown in somewhere could break everything due to change in timing. – jfriend00 Apr 15 '23 at 03:43
  • @Kaiido - In fact, I found that even just inserting `console.log()` statements changes the outcome. The original code is far too brittle. – jfriend00 Apr 15 '23 at 03:46
  • 2
    The problem seems to be in https://github.com/nodejs/node/blob/40d808f1fbaa7b3666e5837a1b4336947180fac5/src/node_messaging.cc#L777 where they'll process the 1000 first messages already queued synchronously. This isn't web compatible, in browsers we are ensured that microtasks will be performed after a task, and I'm not sure it's done on purpose here. I'll probably raise an issue. – Kaiido Apr 15 '23 at 04:24