I'm experimenting with java flavored zmq to test the benefits of using PGM over TCP in my project. So I changed the weather example, from the zmq guide, to use the epgm transport. Everything compiles and runs, but nothing is being sent or received. If I change the transport back to TCP, the server receives the messages sent from the client and I get the console output I'm expecting.
So, what are the requirements for using PGM? I changed the string, that I'm passing to the bind and connect methods, to follow the zmq api for zmq_pgm: "transport://interface;multicast address:port". That didn't work. I get and invalid argument error whenever I attempt to use this format. So, I simplified it by dropping the interface and semicolon which "works", but I'm not getting any results.
I haven't been able to find a jzmq example that uses pgm/epgm and the api documentation for the java binding does not define the appropriate string format for an endpoint passed to bind or connect. So what am I missing here? Do I have to use different hosts for the client and the server?
One thing of note is that I'm running my code on a VirtualBox VM (Ubuntu 14.04/OSX Mavericks host). I'm not sure if that has anything to do with the issue I'm currently facing.
Server:
public class wuserver {
public static void main (String[] args) throws Exception {
// Prepare our context and publisher
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("epgm://xx.x.x.xx:5556");
publisher.bind("ipc://weather");
// Initialize random number generator
Random srandom = new Random(System.currentTimeMillis());
while (!Thread.currentThread ().isInterrupted ()) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = 10000 + srandom.nextInt(10000) ;
temperature = srandom.nextInt(215) - 80 + 1;
relhumidity = srandom.nextInt(50) + 10 + 1;
// Send message to all subscribers
String update = String.format("%05d %d %d", zipcode, temperature, relhumidity);
publisher.send(update, 0);
}
publisher.close ();
context.term ();
}
}
Client:
public class wuclient {
public static void main (String[] args) {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to server
System.out.println("Collecting updates from weather server");
ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
//subscriber.connect("tcp://localhost:5556");
subscriber.connect("epgm://xx.x.x.xx:5556");
// Subscribe to zipcode, default is NYC, 10001
String filter = (args.length > 0) ? args[0] : "10001 ";
subscriber.subscribe(filter.getBytes());
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
// Use trim to remove the tailing '0' character
String string = subscriber.recvStr(0).trim();
StringTokenizer sscanf = new StringTokenizer(string, " ");
int zipcode = Integer.valueOf(sscanf.nextToken());
int temperature = Integer.valueOf(sscanf.nextToken());
int relhumidity = Integer.valueOf(sscanf.nextToken());
total_temp += temperature;
}
System.out.println("Average temperature for zipcode '"
+ filter + "' was " + (int) (total_temp / update_nbr));
subscriber.close();
context.term();
}
}