34

I have working (stock) script from node

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd && msg.cmd == 'notifyRequest') {
        numReqs++;
      }
    });
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

In the above script I can send data from worker to master process with ease. But how to send data from master to the worker/workers? With examples, if it possible.

Medet Tleukabiluly
  • 11,662
  • 3
  • 34
  • 69
htonus
  • 629
  • 1
  • 9
  • 19

7 Answers7

49

Because cluster.fork is implemented on top of child_process.fork, you can send messages from a master to the worker by using worker.send({ msg: 'test' }), and from a worker to a master by process.send({ msg: 'test' });. You receive the messages like so: worker.on('message', callback) (from worker to master) and process.on('message', callback); (from master to worker).

Here's my full example, you can test it by browsing http://localhost:8000/ Then the worker will send a message to the master and the master will reply:

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var worker;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    worker = cluster.fork();

    worker.on('message', function(msg) {
      // we only want to intercept messages that have a chat property
      if (msg.chat) {
        console.log('Worker to master: ', msg.chat);
        worker.send({ chat: 'Ok worker, Master got the message! Over and out!' });
      }
    });

  }
} else {
  process.on('message', function(msg) {
    // we only want to intercept messages that have a chat property
    if (msg.chat) {
      console.log('Master to worker: ', msg.chat);
    }
  });
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ chat: 'Hey master, I got a new request!' });
  }).listen(8000);
}
alessioalex
  • 62,577
  • 16
  • 155
  • 122
  • Actually I want to create push server for (web/flash)socket clients. Current version stacks on 1000 simultaneous connections. So I decided to create several workers with socket.io listeners. This means I need to pass data to workers in asynchronous way. – htonus Dec 16 '11 at 13:51
  • 2
    That sounds ok, make sure you use Socket.IO with RedisStore. – alessioalex Dec 16 '11 at 13:52
  • 2
    This won't work. `var` inside `for`? `worker` will hold the last worker forked, not each one (specially inside the event callback). Either you don't care about all and you just enclose your callback or you hold all workers in Array. – dresende Dec 21 '11 at 11:32
  • Sorry about the var, I've copied a part of his code. I don't hold all my workers into an array because I just wanted to prove the functionality. – alessioalex Dec 21 '11 at 11:35
  • @alessioalex Would sending data through the workers be slower or faster than using redis? – NiCk Newman Sep 03 '15 at 12:16
  • @NiCkNewman tbh I'm not exactly sure. With cluster you are using `child_process` which I think opens a socket to handle the inter-process communication. You should do a few tests with real data and see how they perform. – alessioalex Sep 14 '15 at 12:04
  • If we want to send JavaScript objects, is it toJSON and fromJSON on the other side? – Matthew James Briggs Jun 14 '18 at 22:11
8

I found this thread while looking for a way to send a message to all child processes and was thankfully able to figure it out thanks to the comments about arrays. Just wanted to illustrate a potential solution for sending a message to all child processes utilizing this approach.

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var workers = [];

if (cluster.isMaster) {
  // Broadcast a message to all workers
  var broadcast = function() {
    for (var i in workers) {
      var worker = workers[i];
      worker.send({ cmd: 'broadcast', numReqs: numReqs });
    }
  }

  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd) {
        switch (msg.cmd) {
          case 'notifyRequest':
            numReqs++;
          break;
          case 'broadcast':
            broadcast();
          break;
        }
    });

    // Add the worker to an array of known workers
    workers.push(worker);
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // React to messages received from master
  process.on('message', function(msg) {
    switch(msg.cmd) {
      case 'broadcast':
        if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs);
      break;
    }
  });

  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
    process.send({ cmd: 'broadcast' });
  }).listen(8000);
}
Kevin Reilly
  • 6,096
  • 2
  • 25
  • 18
8

Here's how I implemented a solution to a similar problem. By hooking into cluster.on('fork'), you can attach message handlers to workers as they are forked (rather than storing them in an array), which has the added advantage of dealing with cases where workers die or disconnect and a new worker is forked.

This snippet would send a message from the master to all workers.

if (cluster.isMaster) {
    for (var i = 0; i < require('os').cpus.length; i++) {
        cluster.fork();
    }

    cluster.on('disconnect', function(worker) {
        cluster.fork();
    }

    // When a new worker process is forked, attach the handler
    // This handles cases where new worker processes are forked
    // on disconnect/exit, as above.
    cluster.on('fork', function(worker) {
        worker.on('message', messageRelay);
    }

    var messageRelay = function(msg) {
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send(msg);
        });
    };
}
else {
    process.on('message', messageHandler);

    var messageHandler = function messageHandler(msg) {
        // Worker received message--do something
    };
}
LiquidPony
  • 2,188
  • 1
  • 17
  • 19
3

I understand your purpose of broadcasting to all the node worker processes in a cluster, although you can not send socket component as such but there is a work around for the purpose to be served. I will try an explain with an example :

Step 1 : When a client action requires a broadcast :

Child.js (Process that has been forked) :

socket.on("BROADCAST_TO_ALL_WORKERS", function (data) 
{
    process.send({cmd : 'BROADCAST_TO_ALL_WORKERS', message :data.message});
}) 

Step 2 : On the cluster creation side

Server.js (Place where cluster forking happens):

if (cluster.isMaster) {

  for (var i = 0; i < numCPUs; i++) {

    var worker = cluster.fork();

    worker.on('message', function (data) {
     if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
       console.log(server_debug_prefix() + "Server Broadcast To All, Message : " + data.message + " , Reload : " + data.reload + " Player Id : " + data.player_id);
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
        });
      }
    });
  }

  cluster.on('exit', function (worker, code, signal) {
    var newWorker = cluster.fork();
    newWorker.on('message', function (data) {
      console.log(data);
      if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
        console.log(data.cmd,data);
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
        });
      }
    });
  });
} 
else {
  //Node Js App Entry
  require("./Child.js");
}

Step 3: To Broadcast in the child process -

-> Put this before io.on("connection") in Child.js

process.on("message", function(data){
    if(data.cmd === "BROADCAST_TO_WORKER"){
        io.sockets.emit("SERVER_MESSAGE", { message: data.message, reload: data.reload, player_id : data.player_id });
    }
});

I hope this helps. Please let me know if more clarification is required.

Ondrej Slinták
  • 31,386
  • 20
  • 94
  • 126
Raman
  • 31
  • 3
1

You should be able to send a message from the master to the worker like this:

worker.send({message:'hello'})

because "cluster.fork is implemented on top of child_process.fork" (cluster.fork is implemented on top of child_process.fork)

cheng81
  • 2,434
  • 2
  • 21
  • 18
  • Yes it works, thank you! In other words: while forking workers I should store them in an array. And iterate this array in order to send data to every child. Is the any other way to send data to all workers without storing and iteration. – htonus Dec 16 '11 at 13:36
  • If you don't want to store the workers into an array and iterate through them to send messages you can use a unix domain socket to communicate messages from the master to the workers. – alessioalex Dec 16 '11 at 13:49
  • I suppose you can create an EventEmitter in the master, them emit an event whenever a message is received. After the creation of each worker, you just need to add a listener to the EventEmitter that will send the message to the worker. Of course this is still implemented storing the references of the listeners (thus of the worker too) into the EventEmitter, but hey, at least you don't have to look at it – cheng81 Feb 24 '12 at 14:48
0

If you need to send just simple configuration data for your child process, you can send environment variables with cluster.fork(). This is useful and has benefits against sending message over cluster and process send methods.

const cluster = require('cluster')

if (cluster.isMaster) {
  cluster.fork({
    MY_DATA: 'something here'
  })
} else {
  console.log(process.env.MY_DATA) // "something here"
}
bhdrk
  • 3,415
  • 26
  • 20
0

In order to send a message to all active workers from the Master Note: this based on one worker triggering the message broadcast, but it can be triggered by anything.

const cluster = require('cluster')
const numCPUs = require("os").cpus().length;

// Master
if ( cluster.isMaster ) {

  for (let i = 0; i < numCPUs; i++) {

    let forkedWorker = cluster.fork();

    forkedWorker.on('message', workerMessage => {

      if (workerMessage === 'pleaseUpdateAllWorkers') {

        let workerPool = cluster.workers; // <-- this returns an Object with all worker nodes with their clister.worker.id as their Object Key

        let workerKeys = Object.keys(workerPool); // e.g. [ '1', '2', '3', '4' ] for 4 workers

        workerKeys.forEach( workerKey => {
          let workerProcess = workerPool[workerKey]; // we're accessing this iteration's worker in the workerPool

          // ensure the worker is still online
          if (workerProcess.state === "online") {
            // broadcast
            workerProcess.send('newUpdate!')
          }

        })

        /* for the one-liners
          Object.keys(cluster.workers).forEach( w => (cluster.workers[w].state === "online") ? cluster.workers[w].send('newUpdate!') : null )
        */

      }

    })

  }

}

// Worker
else {

  async function doesSomething(){
  
    await doingSomething();
    
    process.send('pleaseUpdateAllWorkers')
  
  }
  
  process.on('message', masterMessage => {
  
    if (masterMessage === "newUpdate!") {
    
      console.log(`worker id ${cluster.worker.id} received an update from master.`, masterMessage)
    
      // politely respond to master
      process.send('thanks!') 
    }
  
  })

}

If you'd like to a response to a specific worker, you would use it's ID to send it a message

workerPool[workerIDYouWantToUpdate].send('newUpdate!')

Or, if you just want to respond to the worker that sent the message:

forkedWorker.send('newUpdate!')