9

I have a Node server that creates a child process with fork() using IPC. At some point the child sends results back to the parent at about 10Hz as part of a long-running task. When the payload passed to process.send() is small all works well: every message I send is received ~immediately and processed by the parent.

However, when the payload is 'large'—I haven't determined the exact size limit—instead of being immediately received by the parent all payloads are first sent, and only once the child is done its long-running task does the parent receive and process the messages.

tl;dr visual:

Good (happens with small payload):

child:  send()
parent: receive()
child:  send()
parent: receive()
child:  send()
parent: receive()
...

Bad (happens with big payload):

child:  send()
child:  send()
child:  send()
(repeat many times over many seconds)
...
parent: receive()
parent: receive()
parent: receive()
parent: receive()
...
  1. Is this a bug? (Edit: behavior only occurs on OS X, not Windows or Linux)
  2. Is there any way to avoid this, other than trying to keep my IPC payload small?

Edit 2: the sample code below uses both time and iteration counter to select when to send an update. (In my actual code it's also possible to send an update after n iterations, or after the loop achieves certain results.) As such a rewrite of the code to use setInterval/setTimeout instead of a loop is a last resort for me, as it requires me to remove features.

Edit: Here is test code that reproduces the problem. However, it only reproduces on OS X, not on Windows or Linux:

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', msg => console.log(`parent: receive() ${msg.data.length} bytes`, Date.now()));

require('http').createServer((req, res) => {
   console.log(req.url);
   const match = /\d+/.exec(req.url);
   if (match) {
      child.send(match[0]*1);
      res.writeHead(200, {'Content-Type':'text/plain'});
      res.end(`Sending packets of size ${match[0]}`);
   } else {
      res.writeHead(404, {'Content-Type':'text/plain'});
      res.end('what?');
   }
}).listen(8080);

worker.js

if (process.send) process.on('message', msg => run(msg));

function run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e7; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.log(`worker: send()  > ${messageSize} bytes`, now);
         process.send({action:'update', data:msg});
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   console.log('worker done');
}

About around 8k the problem happens. For example, when querying http://localhost:8080/15 vs http://localhost:8080/123456

/15
worker: send()  > 15 bytes 1571324249029
parent: receive() 15 bytes 1571324249034
worker: send()  > 15 bytes 1571324249235
parent: receive() 15 bytes 1571324249235
worker: send()  > 15 bytes 1571324249436
parent: receive() 15 bytes 1571324249436
worker done
/123456
worker: send()  > 123456 bytes 1571324276973
worker: send()  > 123456 bytes 1571324277174
worker: send()  > 123456 bytes 1571324277375
child done
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277393

Experienced on both Node v12.7 and v12.12.

Phrogz
  • 296,393
  • 112
  • 651
  • 745
  • 1
    Instead of queuing the messages in a blocking loop, why not use a `setInterval()`? – Patrick Roberts Oct 17 '19 at 05:04
  • @PatrickRoberts Are you questioning why `run()` has a `while` loop in it? Are you suggesting that switching that to `setInterval()` will solve my problem? To answer the question I think you're asking: I use a `while` loop because that function is the sole purpose of this worker process, and (with small IPC payloads) it did not cause any problem that I could see. – Phrogz Oct 17 '19 at 05:20
  • 1
    Blocking like that serves no beneficial purpose. Using a non-blocking timing mechanism like `setInterval()` frees up the event loop to perform I/O in the background. I'm not saying it will definitely solve this problem, but it seems an odd choice to write it the way you have, just because you can. – Patrick Roberts Oct 17 '19 at 05:41
  • @PatrickRoberts Thanks for the input. I didn't write it that way "just because I can", but rather because originally the code was console-based with no IPC. A while loop that periodically prints out results seemed reasonable at the time, but is experiencing this problem (on macOS only). – Phrogz Oct 17 '19 at 15:24
  • Writing a blocking loop that polls the current time until a time-based condition is met is an antipattern in JavaScript, period. Doesn't matter if it had IPC before or not. Always prefer a non-blocking approach using `setTimeout()` or `setInterval()`. The change here is trivial. – Patrick Roberts Oct 17 '19 at 16:17
  • If it was only time-based I'd agree with you. (And it *is* only time-based in this sample code, so your comments are welcome.) In my real code there are three conditions which may cause a progress update: one based on time, one based on number of iterations processed, and one based on results achieved in the loop. Rewriting for time-only is, I agree, not hard. I'm holding out hope for an workaround that retains my loop-based approach. I'll update the question accordingly. – Phrogz Oct 17 '19 at 16:21

4 Answers4

2

lHaving a long-running and blocking while loop in combination with sockets or file descriptors in node is always an indication that something is done wrong.

Without being able to test the whole setup it is hard to tell if my claim is really correct, but short messages can probably be passed directly in one chunk to the OS which then passes it to the other process. With larger messages node would need to wait until the OS can receive more data, so sending is queued up, and as you have a blocking while the sending is queue until the while loop ended.

So to your question, not that is not a bug.

As you use a recent nodejs version I would use a await and async instead of and create a non-blocking while similar to the sleep in this answer. The await will allow the node event loop to intercept if processSome returns pending Promise.

For your code that is not really reflect a real use case, it is hard to tell how to solve it correctly. If you don't do anything async in processSome that would allow the I/O to intercept then you need to do that manually on a regular basis, with e.g. a await new Promise(setImmediate);.

async function run() {
  let interval = setInterval(() => {
    process.send({action:'update', data:status()});
    console.log('child:  send()');
  }, 1/10)

  while(keepGoing()) {
    await processSome();
  }

  clearInterval(interval)
}
t.niese
  • 39,256
  • 9
  • 74
  • 101
  • Thank you for this answer. Per my edit to the question my _real_ code has multiple conditions to send an update, only one of which is based on time. It appears that you've moved the `processSome()` code out of the `while` loop. (Or perhaps I'm missing something crucial related to promises.) – Phrogz Oct 17 '19 at 16:26
  • 1
    @Phrogz ah ok no, I accidentally read the braces the wrong way. I updated the answer so that `process.send({action:'update', data:status()});` is excuted when `every10Hz` is true, and `processSome` for every iteration of the `while`. The `await` should allow the EvenLoop of node to intercept even if `processSome` is does not return a Promise. But the reason for your problem is still that the loop is blocking. – t.niese Oct 17 '19 at 16:37
  • Two comments about this answer as-is. If `processSome()` does not return a promise, then this approach still blocks I/O (microtasks like the continuation produced by this `await` statement are processed _before_ IO). Also, this will cause the iteration to perform a lot more slowly because of the queued microtask at every iteration. – Patrick Roberts Oct 17 '19 at 18:13
  • @PatrickRoberts yes you are right, it has to return a none resolved Promise. – t.niese Oct 17 '19 at 18:34
1

Regarding your first question

Is this a bug? (Edit: behavior only occurs on OS X, not Windows or Linux)

This is definitely not bug and I could reproduce it on my windows 10 (for the size 123456). It's mostly because of the underlying kernel buffering and context switching by OS, as two separate processes (not detached) are communicating over a ipc descriptor.

Regarding your second question

Is there any way to avoid this, other than trying to keep my IPC payload small?

If I understand the problem correctly, you're trying to solve, for each http request, every time the worker sends a chunk back to the server, you want the server to process it before you get the next chunk. That's how I understand when you said sync processing

There's a way using promises, but I would like to use generators in the workers. It's better to orchestrate the flow across server and worker

Flow:

  1. Server sends an integer to the worker whatever it gets from http request
  2. Worker then creates and runs generator to send the first chunk
  3. Worker yields after sending the chunk
  4. Server requests for more
  5. Worker generates more since server asked for more (only if available)
  6. If no more, worker sends of end of chunks
  7. Server just logs that worker is done and doesn't request any more

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc'], detached:false};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', (msg) => {
   //FLOW 7: Worker is done, just log
   if (msg.action == 'end'){
      console.log(`child ended for a particular request`)
   } else {
      console.log(`parent: receive(${msg.data.iter}) ${msg.data.msg.length} bytes`, Date.now())
      //FLOW 4: Server requests for more
      child.send('more')
   }   

});

require('http').createServer((req, res) => {
   console.log(req.url);
   const match = /\d+/.exec(req.url);   
   if (match) {
      //FLOW 1: Server sends integer to worker
      child.send(match[0]*1);
      res.writeHead(200, {'Content-Type':'text/plain'});
      res.end(`Sending packets of size ${match[0]}`);
   } else {
      res.writeHead(404, {'Content-Type':'text/plain'});
      res.end('what?');
   }
}).listen(8080);

worker.js

let runner
if (process.send) process.on('message', msg => {   
   //FLOW 2: Worker creates and runs a generator to send the first chunk
   if (parseInt(msg)) {
      runner = run(msg)
      runner.next()
   }
   //FLOW 5: Server asked more, so generate more chunks if available
   if (msg == "more") runner.next()

});

//generator function *
function* run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e7; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.log(`worker: send(${i})  > ${messageSize} bytes`, now);
         let j = i         
         process.send({action:'update', data:{msg, iter:j}});
         //FLOW 3: Worker yields after sending the chunk
         yield
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   //FLOW 6: If no more, worker sends end signal
   process.send({action:'end'});
   console.log('worker done');
}

If we know the exact use case, there could be better ways to program it. This is just one way of synchronizing the child process retaining much of your original source code.

manikawnth
  • 2,739
  • 1
  • 25
  • 39
1

While I agree with others that the optimal solution would be one where the child process can voluntarily give up control at the end of each loop, allowing the buffer flushing processes to run, there is an easy/quick/dirty fix that gets you almost synchronous behavior, and that is to make the child send calls blocking.

Use the same server.js as before, and almost the same worker.js, with just one line added:

worker.js

if (process.send) process.on('message', msg => run(msg));

// cause process.send to block until the message is actually sent                                                                                
process.channel.setBlocking(true);

function run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e6; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.error(`worker: send()  > ${messageSize} bytes`, now);
         process.send({action:'update', data:msg});
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   console.log('worker done');
}

Output:

/123456
worker: send()  > 123456 bytes 1572113820591
worker: send()  > 123456 bytes 1572113820630
parent: receive() 123456 bytes 1572113820629
parent: receive() 123456 bytes 1572113820647
worker: send()  > 123456 bytes 1572113820659
parent: receive() 123456 bytes 1572113820665
worker: send()  > 123456 bytes 1572113820668
parent: receive() 123456 bytes 1572113820678
worker: send()  > 123456 bytes 1572113820678
parent: receive() 123456 bytes 1572113820683
worker: send()  > 123456 bytes 1572113820683
parent: receive() 123456 bytes 1572113820687
worker: send()  > 123456 bytes 1572113820687
worker: send()  > 123456 bytes 1572113820692
parent: receive() 123456 bytes 1572113820692
parent: receive() 123456 bytes 1572113820696
worker: send()  > 123456 bytes 1572113820696
parent: receive() 123456 bytes 1572113820700
worker: send()  > 123456 bytes 1572113820700
parent: receive() 123456 bytes 1572113820703
worker: send()  > 123456 bytes 1572113820703
parent: receive() 123456 bytes 1572113820706
worker: send()  > 123456 bytes 1572113820706
parent: receive() 123456 bytes 1572113820709
worker: send()  > 123456 bytes 1572113820709
parent: receive() 123456 bytes 1572113820713
worker: send()  > 123456 bytes 1572113820714
worker: send()  > 123456 bytes 1572113820721
parent: receive() 123456 bytes 1572113820722
parent: receive() 123456 bytes 1572113820725
worker: send()  > 123456 bytes 1572113820725
parent: receive() 123456 bytes 1572113820727
Old Pro
  • 24,624
  • 7
  • 58
  • 106
  • Defining the blocking statement direct in the source code is a smart idea. It will produce a bottleneck which can't be fixed. The reason is that the sourcecode is stored to the harddrive which makes it problematic to use a rule engine to change the behavior on the fly. – Manuel Rodriguez Oct 29 '19 at 10:08
0

If you need to guarantee that a message is received before sending the next one, you can wait for the master to acknowledge receiving. This will delay sending the next message of course, but since your logic relies on both time & iteration number to determine whether to send a message then it may be alright for your case.

The implementation will need each worker to create a promise for each message sent, and wait for a reply from master before resolving the promise. This also means you need to identify which message is acknowledged based on a message id or something unique if you have more than one message or worker simultaneously.

here's the modified code

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', msg =>  {
    console.log(`parent: receive() ${msg.data.length} bytes`, Date.now())
    // reply to the child with the id
    child.send({ type: 'acknowledge', id: msg.id });
});

...

worker.js

const pendingMessageResolves = {};

if (process.send) process.on('message', msg => { 
    if (msg.type === 'acknowledge') {
        // call the stored resolve function
        pendingMessageResolves[msg.id]();
        // remove the function to allow the memory to be freed
        delete pendingMessageResolves[msg.id]
    } else {
        run(msg) 
    }
});

const sendMessageAndWaitForAcknowledge = (msg) => new Promise(resolve => {
    const id = new uuid(); // or any unique field
    process.send({ action:'update', data: msg, id });
    // store a reference to the resolve function
    pendingMessageResolves[id] = resolve;
})

async function run(messageSize) {
    const msg = new Array(messageSize+1).join('x');
    let lastUpdate = Date.now();
    for (let i=0; i<1e7; ++i) {
        const now = Date.now();
        if ((now-lastUpdate)>200 || i%5000==0) {
            console.log(`worker: send()  > ${messageSize} bytes`, now);
            await sendMessageAndWaitForAcknowledge(msg); // wait until master replies
            lastUpdate = Date.now();
        }
        Math.sqrt(Math.random());
    }
    console.log('worker done');
}

p.s. I didn't test the code so it might need some tweaking, but the idea should hold.

gafi
  • 12,113
  • 2
  • 30
  • 32