0

I have a little project that implements NATS Queueing
This is the code:

import * as NATS from '../node_modules/nats'              // for typescript
var nats = require('nats');

var config = require('./config');
var opt: NATS.ClientOpts = {'noRandomize': false, 'reconnect': true, 
'maxReconnectAttempts': -1, 'servers':config.nat.servers, 'user': 
config.nat.user , 'pass': config.nat.pass };
var nc: NATS.Client = nats.connect(opt);

nc.on('error', function(e) {
    console.error('Error nats: ' + e);
});

nc.on('close', function() {
    console.error('CLOSED nats');
});

nc.subscribe('serviceA', { 'queue': 'A' }, function (request, replyTo) {
    console.debug('I exec serviceA via nats:');
    j = JSON.parse(request);
    console.debug(j);
    let ok = {"res": "i am response for service A"}
    nc.publish(replyTo, JSON.stringify(ok));
}

let cmd = '{"a": 5, "b": "i am sample"}';
nc.requestOne('serviceA', cmd, {}, 15000, function (response) {
    if (response.code && response.code === nats.REQ_TIMEOUT) {
        console.error('server timeout');
        console.error(response.code);
    } else {
        console.log('A see this response: ' + response);
    }
});

nc.subscribe('serviceB', { 'queue': 'B' }, function (request, replyTo) {
    console.debug('I exec serviceB via nats:');
    j = JSON.parse(request);
    console.debug(j);
    let ok = {"res": "i am response for service B"}
    nc.publish(replyTo, JSON.stringify(ok));
}

let cmd = '{"c": 5, "d": "i am sample"}';
nc.requestOne('serviceB', cmd, {}, 15000, function (response) {
    if (response.code && response.code === nats.REQ_TIMEOUT) {
        console.error('server timeout');
        console.error(response.code);
    } else {
        console.log('B see this response: ' + response);
    }
});

As you can see, there are 2 services - serviceA on queue A and serviceB on queue B and 2 clients: the first calls service A and the second calls service B

NATS implements Subject ( 'serviceA' and 'serviceB' )

Now, I want try to convert the example using ØMQ I found a similar sample using ZeroMQ

But I can find nothing sample on Subject.

Perhaps ØMQ uses ROUTER so as to implements a subject

Can you help me to implement subject into a ZeroMQ example?

user3666197
  • 1
  • 6
  • 50
  • 92
Janka
  • 1,908
  • 5
  • 20
  • 41

1 Answers1

2

Q: Is it possible to use a Subject in ZeroMQ?
A: Yes, it is:

The long story short - one does not need any zmq.ROUTER for doing this, just use a PUB / SUB formal pattern.

Beware: ZeroMQ Socket()-instance is not a tcp-socket-as-you-know-it.

Best
read about the main conceptual differences in [ ZeroMQ hierarchy in less than a five seconds ] Section.

The Publisher side:

import zmq
aCtx = zmq.Context()
aPub = aCtx.Socket( zmq.PUB )
aPub.bind( "tcp://123.456.789.012:3456" )
aPub.setsockopt(    zmq.LINGER,   0 )
aPub.setsockopt(    zmq.<whatever needed to fine-tune the instance>, <val> )
i = 0
while true:
      try:
         aPub.send( "ServiceA::[#{0:_>12d}] a Hello World Message.".format( i ) )
         aPub.send( "ServiceABCDEFGHIJKLMNOPQRSTUVWXYZ........" )
         aPub.send( "ServiceB::[#{0:_>12d}] another message...".format( i  / 2 ) ) if ( i == ( 2 * ( i / 2 ) ) ) else pass
         sleep( 1 ); i += 1

      except KeyboardInterrupt:
          print( "---< will exit >---" )
          break
print( "---< will terminate ZeroMQ resources >---" )
aPub.close()
aCtx.term()

The Subscriber side:

import zmq
aCtx = zmq.Context()
aSub = aCtx.Socket( zmq.SUB )
aSub.connect( "tcp://123.456.789.012:3456" )
aSub.setsockopt( zmq.LINGER, 0 )
aSub.setsockopt( zmq.SUBSCRIBE, "ServiceA" ) # Subject ( 'serviceA' and 'serviceB' ) 
aSub.setsockopt( zmq.SUBSCRIBE, "ServiceB" ) # Kindly see the comments below
#                                            # Kindly see API on subscription management details
#
#                                            # Yet another "dimension"
#                                            #     to join ingress
#                                            #     from multiple sources
#Sub.connect( "<transport-class>://<addr>:<port>" )
#              <transport-class :: { inproc | ipc | tcp | pgm | epgm | vmci }
#   .connect()-s the same local SUB-AccessPoint to another PUB-side-AccessPoint
#                to allow the PUB/SUB Scalable Formal Communication Archetype Pattern
#                join a message flow from different source PUB-sides
#Sub.setsockopt( zmq.SUBSCRIBE, "ServiceZ" )

while true:
      try:
          print( "<<< (((_{0:s}_)))".format( aSub.recv() ) )
      except KeyboardInterrupt:
          print( "---< will exit >---" )
          break

print( "---< will terminate ZeroMQ resources >---" )
aSub.close()
aCtx.term()
user3666197
  • 1
  • 6
  • 50
  • 92
  • I don't understand i connot see Subject ( 'serviceA' and 'serviceB' ) in your example – Janka Nov 30 '17 at 13:59
  • Janka, ZeroMQ subscription mechanics are hardwired since the API v.2.1+ till recent v.4.?.+ as a left-to-right message-payload string-matching., Knowing this, one can implement whatever boolean logic, but has to respect the hardwired rules. Topic filtering is a bit complicated, initial API was using the SUB-side Topic-filter ( yes, that means deliver all messages to all SUB-s, they will decide on their own ). A more recent API has changed the plumbing into a PUB-side Topic-filtering, increasing the workload on the PUB-side Context()-instance, which has to handle all the process + buffer loads. – user3666197 Nov 30 '17 at 14:13
  • **Beware:** ZeroMQ Socket()-instance is **not** a tcp-socket-as-you-know-it. Best read about the main conceptual differences in **[ ZeroMQ hierarchy in less than a five seconds ]** Section in >>> https://stackoverflow.com/a/46620571 – user3666197 Nov 30 '17 at 14:35
  • @Janka would you mind to review all the help and consultations provided to your posted problem and select + reward the best answer provided, as the StackOverflow Community practice defines? **That's fair, isn't it?** – user3666197 Jun 18 '18 at 21:45