1

I have a nodejs cluster with a primary that handles worker cycles (in the while loop) and that listens to worker messages to progress in the cycle. (In my code index.js does not send messages on setInterval but on other type of event, I have here simplified the code to get the essence of the problem)

Server.js

var cluster = require('cluster');
const ClusterMessages = require('cluster-messages');
const messages = new ClusterMessages();

if (cluster.isMaster){
    let worker = cluster.fork()
    console.log(cluster);
    
    (async ()=>{
    let cycle = 0
    while(true){
        console.log(cycle);
        cycle ++
        await Promise.all([
            enough(),
        ])
    }
    function enough () {
        return new Promise(resolve => {
            messages.on('enough', () => {
                console.log('enough');
                resolve()
            });
        });
    }})()
} else {
    require('./index.js')
}

Index.js

const ClusterMessages = require('cluster-messages');
const messages = new ClusterMessages();

setInterval(() => {
    messages.send('enough');
}, 1000);

The code is working fine (as such, in this example and in my code) but there seems to be a memory leak as you can understand from the output of this code:

0
enough
1
enough
enough
2
enough
enough
enough
3
enough
enough
enough
enough...

I tried several things like exchanging new Promise and messages.on(), add a return in the callback of the promise but I have no clue what is happening here. Any ideas?


The solution is to make another event that can be triggered once contrary to the 'event listener' of the cluster-messages package

Server.js

if (cluster.isMaster){
    let worker = cluster.fork()
    console.log(cluster);

    // Importing events
    const EventEmitter = require('events');
    const eventEmitter = new EventEmitter();
    
    messages.on('enough', () => {
        eventEmitter.emit('event');
    });
    (async ()=>{let cycle = 0
    while(true){
        console.log(cycle);
        cycle ++
        await Promise.all([
            enough(),
        ])
    }
    function enough () {
        return new Promise(resolve => {
            eventEmitter.once('event', () => {
                console.log('event');
                resolve()
            });
        });
    }})()
} else {
    require('./index.js')
}

Index.js

const ClusterMessages = require('cluster-messages');
const messages = new ClusterMessages();

setInterval(() => {
    messages.send('enough');
}, 1000);
JeanJacquesGourdin
  • 1,496
  • 5
  • 25
  • Why are you using a `while` loop and promises to handle events? – Bergi Nov 29 '21 at 08:21
  • I use while because i need the cycle to run an unknown number of times I use promises because i dont only have the enough function to resolve but rather 3 different with one being a timeout (the minimum time to wait before going to the next step in the loop) – JeanJacquesGourdin Nov 29 '21 at 08:34

1 Answers1

2

Every call of enough() installs another listener for the enough event on messages. They never get removed, leaking memory (and leading to an increasing number of logs per event). Instead, use the once method to install the listener:

function enough () {
    return new Promise(resolve => {
        messages.once('enough', () => {
//               ^^^^
            console.log('enough');
            resolve();
        });
    });
}

Or even simpler, using once:

const { once } = require('events');
function enough() {
    return once(messages, 'enough');
}

In your particular example, I would recommend not to use promises to handle the events. You might even miss events that are fired while you are removing and re-attaching a listener. Just write

let cycle = 0
messages.on('enough', () => {
    console.log(cycle);
    cycle++;
});

If for some reason you need a loop that you can break from or await other things in, I would recommend an asynchronous iterator, built with on:

const { once } = require('events');
(async () => {
    let cycle = 0
    for await (const _ of on(messages, 'enough')) {
        console.log(cycle);
        cycle++;
    }
})();
Bergi
  • 630,263
  • 148
  • 957
  • 1,375
  • When i try your solution i have 'once is not a function' The two last answers are not usable for me as such, the cycle variable is here just used to help you make sense of what is happening but has no real utility in the code – JeanJacquesGourdin Nov 29 '21 at 08:37
  • @JeanJacquesGourdin What version of nodejs are you on? See the documentation for when these utility methods were introduced. – Bergi Nov 29 '21 at 08:38
  • Im on 16.13 but the problem seems to emerge from the npm package cluster messages, as there is no possibility to shut down event listeners I have no idea how to get around this problem like having just one messages.on outside enough() and trigger the resolve()... – JeanJacquesGourdin Nov 29 '21 at 08:55
  • Oh, I see, [`ClusterMessages`](https://github.com/Rob--/cluster-messages/blob/master/index.js) is not an `EventEmitter` but only an event emitter by name. You might want to file a feature request with the library to make it a subclass. In the meantime, just create your own `new EventEmitter` and call its `.emit()` method from `messages.on`. There are alternatives (like using a [promise queue](https://stackoverflow.com/a/47157577/1048572)) but this is probably the simplest workaround. – Bergi Nov 29 '21 at 09:05
  • Thank you, it's working, i'm gonna publish the answer – JeanJacquesGourdin Nov 29 '21 at 09:16