3

I have a process that uses RabbitMQ and NodeJS to do image processing. Due to the intensive task, I think I have the same issue as the link here https://github.com/squaremo/amqp.node/issues/261

I am trying to figure out how to implement the last comment on that issue.

"Yep. NodeJS is single-threaded, and if you use that thread to do something for a long time, nothing else will happen. As @michaelklishin suggests, the known solution for this general problem is using a child process, or the cluster module."

EDIT:

I Updated the code below with a sample of how I think I can do this with the amqp-connection-manager module. Right now I use a global variable to hold the actual message to be able to ack. I am guessing there is a better way to do this.

//Used to be an example for how to keep the connection thread and the working thread separate
//in order to fix the issue of missing heartbeat intervals due to processing on the same thread

const cluster = require('cluster');
var amqp = require('amqp-connection-manager');
var config = require('./config.json');

var working_queue = "Test_Queue";

//DONT REALLY DO THIS
var rabbit_msg_data;


//******* CLUSTER SETUP *******
// This will spawn off the number of worker for this process.
if (cluster.isMaster) {
    console.log("Master "+process.pid+" is running");

    worker = cluster.fork();

    cluster.on('exit', (worker, code, signal) => {
        if(signal)
        {
            console.log("worker was killed by signal: "+signal);
            console.log(worker);
        }
        else if (code !== 0)
        {
            console.log("worker exited with error code: "+code);
            console.log(worker);
        }
        else
        {
            console.log("Worker "+worker.process.pid+" exited successfully");
            console.log(worker);
            //Not sure if this works this way or if I need to put this worker into variables
        }
    });

    //testing sending a message back and forth
    // setTimeout(function() {
    //  worker.send("I got a request!");
    // }, 1000);

    //******** RABBIT MQ CONNECTION **********

    // Create a connection manager to rabbitmq
    var connection = amqp.connect(config.rabbit_connections_arr, {json: true, heartbeatIntervalInSeconds: 2});
    connection.on('connect', function() {
        console.log('Connected to rabbitmq');
    });
    connection.on('disconnect', function(params) {
        console.log('Disconnected from rabbitmq:', params.err.stack);
    });

    // Set up a channel listening for messages in the queue.
    var channelWrapper_listening = connection.createChannel({
        setup: function(channel) {
            // `channel` here is a regular amqplib `ConfirmChannel`.
            return Promise.all([
                channel.assertQueue(working_queue, {durable: true}),
                channel.prefetch(1),
                channel.consume(working_queue, function(data){
                    rabbit_msg_data = data;
                    worker.send(data.content.toString());             
                }, requeue = false)
            ]);
        }
    });


    worker.on('message', function(msg){
        // console.log("Worker to Master (ack): ", msg.content.toString());
        console.log("Worker to Master (ack): ", msg);
        //console.log(msg.content.toString());
        channelWrapper_listening.ack(rabbit_msg_data);
    });



}
else //All worker processes (MAIN LOGIC)
{
    console.log("Worker "+process.pid+" started");

    process.on('message',function(msg){
        console.log("Master to Worker (working): ", msg);

        //send msg back when done working on it.
        setTimeout(function() {
            process.send(msg);
        }, 5000);
    });
}
Crash667
  • 343
  • 2
  • 16
  • how did you solve this? i have this problem too – Cocuba Aug 13 '21 at 13:44
  • I don't know if I would call it a solution. What I ended up doing was moving the amqp connections and functions inside of each cluster worker. Each worker then had its own full connection to rabbit instead of splitting a single connection into channels. – Crash667 Aug 17 '21 at 13:50

0 Answers0