0

I have a node application handling some ZeroMQ events coming from another application utilizing the Node-ZMQ bindings found here: https://github.com/JustinTulloss/zeromq.node

The issue I am running into is one of the operations from an event takes a long time to process and this appears to be blocking any other event from being processed during this time. Although the application is not currently clustered, doing so would only afford a few more threads and doesn't really solve the issue. I am wondering if there is a way of allowing for these async calls to not block other incoming requests while they process, and how I might go about implementing them.

Here is a highly condensed/contrived code example of what I am doing currently:

var zmq = require('zmq');
var zmqResponder = zmq.socket('rep');
var Client = require('node-rest-client').Client;
var client = new Client();

zmqResponder.on('message', function (msg, data) {
  var parsed = JSON.parse(msg);
  logging.info('ZMQ Request received: ' + parsed.event);
  switch (parsed.event) {
    case 'create':
        //Typically short running process, not an issue
    case 'update':
        //Long running process this is the issue
        serverRequest().then(function(response){
            zmqResponder.send(JSON.stringify(response));
        });
     }

});

function serverRequest(){
    var deferred = Q.defer();
      client.get(function (data, response) {
      if (response.statusCode !== 200) {
        deferred.reject(data.data);
      } else {
        deferred.resolve(data.data);
      }
  });
    return deferred.promise;
}

EDIT** Here's a gist of the code: https://gist.github.com/battlecow/cd0c2233e9f197ec0049

Brian
  • 2,294
  • 6
  • 30
  • 51
  • Apologies, it is contrived allow me to edit. – Brian Jan 27 '16 at 17:43
  • If `serverRequest()` is truly async, it should not be blocking anything. – jfriend00 Jan 27 '16 at 17:48
  • That is honestly what I had thought but it does not play out that way and I am unsure of how best to optimize the code, or where I am misstepping. – Brian Jan 27 '16 at 17:55
  • @Brian, is your current example your actual code, or is it still just a contrived code sample? Because as is you're showing completely asynchronous I/O which would not block. However, if there is any javascript that you've written, not an I/O call but in line javascript, that takes a long time to execute, that will block the main event loop even if you call it asynchronously (i.e. even if it's running inside the `client.get()` call). [See my answer here](http://stackoverflow.com/questions/22644328/when-is-the-thread-pool-used/22644735#22644735) for more info on why that is. – Jason Jan 27 '16 at 17:58
  • I'm wondering if there's any chance that some of the zmq methods are actually synchronous, not async such as [this line of code](https://github.com/JustinTulloss/zeromq.node/blob/master/lib/index.js#L595)? – jfriend00 Jan 27 '16 at 18:05
  • @Jason This is still contrived a bit, but the full version is a number of further client requests, in the same form as the one in the example. The code flow is actually PUT, GET, PUT, GET, PUT, GET, DELETE but if one request isn't going to block then many shouldn't either correct? – Brian Jan 27 '16 at 18:13
  • @jfriend00 That may be the case and because the other function I am using it with returns so quickly I never noticed it before? I guess I am not really sure but will have to do some more digging to see if that is the culprit. – Brian Jan 27 '16 at 18:17
  • To the best of my knowledge and experience, the ZMQ bindings should be fully asynchronous, but you should be able to benchmark that fairly easily. I suggest you put your actual code in the example, because it's far more likely you've got a blocking segment in your code that you've overlooked. – Jason Jan 27 '16 at 18:23
  • Lemme get a gist together it's a bit long for the box above. – Brian Jan 27 '16 at 18:31
  • @Jason Added a gist of my code, I appreciate any help you can provide. – Brian Jan 27 '16 at 18:51
  • Where is `customizedOptionalUnits()` defined? Could there be blocking code in there? Does it return a generic javascript object, or some other object with its own `filter()` method? How about `customReplace()`? I don't see any definitions for `artifactory`, so there could also be blocking code hidden in `artifactory.verifyWar()` or `artifactory.getBuildProperties()`. I don't see any red flags in your fleetctl.js code. I see you do a lot of logging to the console, can you pinpoint the functions that are taking a long time to complete? – Jason Jan 27 '16 at 21:02
  • @Jason Sorry missed those functions, I updated the gists verbatim now, but the customizeOptionalUnits is just returning an array of strings. CustomReplace deals with said array of strings, and the other methods mentioned do promise calls out to a server using the client library. – Brian Jan 27 '16 at 22:15
  • without having analyzed the entire gist, how large is the JSON data you're passing around/dealing with? `JSON.parse()` and `JSON.stringify()` are synchronous methods, so that could be where some delay is creeping in. Can you pinpoint via your console logging where the delay is occurring? – Jason Jan 27 '16 at 22:24
  • @Jason The json is ~10 properties long, nothing substantial, I am thinking more and more that the issue lies with the ZMQ module. I created a much more manageable test in a gist here: https://gist.github.com/battlecow/75835bb2085acacfbcf4 if you run two of these together they come back synchronously. – Brian Jan 27 '16 at 23:44
  • Think I figured it out, if my answer below is correct I ran into the same issue early on in my zmq work. It works as designed. – Jason Jan 28 '16 at 00:07

1 Answers1

1

I think, through the comment thread, I've identified your issue. REQ/REP has a strict synchronous message order guarantee... You must receive-send-receive-send-etc. REQ must start with send and REP must start with receive. So, you're only processing one message at a time because the socket types you've chosen enforce that.

If you were using a different, non-event-driven language, you'd likely get an error telling you what you'd done wrong when you tried to send or receive twice in a row, but node lets you do it and just queues the subsequent messages until it's their turn in the message order.

You want to change REQ/REP to DEALER/ROUTER and it'll work the way you expect. You'll have to change your logic slightly for the ROUTER socket to get it to send appropriately, but everything else should work the same.


Rough example code, using the relevant portions of the posted gist:

var zmqResponder = zmq.socket('router');

zmqResponder.on('message', function (msg, data) {
    var peer_id = msg[0];
    var parsed = JSON.parse(msg[1]);
    switch (parsed.event) {
        case 'create':
            // build parsedResponse, then...
            zmqResponder.send([peer_id, JSON.stringify(parsedResponse)]);
            break;
    }
});

zmqResponder.bind('tcp://*:5668', function (err) {
    if (err) {
        logging.error(err);
    } else {
        logging.info("ZMQ awaiting orders on port 5668");
    }
});

... you need to grab the peer_id (or whatever you want to call it, in ZMQ nomenclature it's the socket ID of the socket you're sending from, think of it as an "address" of sorts) from the first frame of the message you receive, and then use send it as the first frame of the message you send back.

By the way, I just noticed in your gist you are both connect()-ing and bind()-ing on the same socket (zmq.js lines 52 & 143, respectively). Don't do that. Inferring from other clues, you just want to bind() on this side of the process.

Jason
  • 13,606
  • 2
  • 29
  • 40
  • Any chance you have some example code? I'm having a hard time figuring out this model with the examples on zmq docs. Trying to get my second gist working. – Brian Jan 28 '16 at 02:34
  • Updated answer with example – Jason Jan 28 '16 at 03:54