6

I am trying to implement using XPUB and XSUB as provided in this below figure. I have gone through their examples provided but could not get one for XPUB and XSUB in Java. Here they have given an example in C which is little complex as I am new to ZeroMQ.

Pub-Sub Network with a Proxy in ZeroMQ
I am trying to use it in android using jni wrapped version. Please help me to find an example, how to implement this Pub-Sub Network with a Proxy in ZeroMQ using java.

Currently I am referring http://zguide.zeromq.org/page:all

I have tried to port it as follows. Subscriber.java

public class Subscriber extends Thread implements Runnable {

private static final String TAG = "Subscriber";
private Context ctx;

public Subscriber(ZMQ.Context z_context) {
    this.ctx = z_context;
}

@Override
public void run() {

    super.run();

    ZMQ.Socket mulServiceSubscriber = ctx.socket(ZMQ.SUB);
    mulServiceSubscriber.connect("tcp://localhost:6001");
    mulServiceSubscriber.subscribe("A".getBytes());
    mulServiceSubscriber.subscribe("B".getBytes()); 


        while (true) {
            Log.d(TAG, "Subscriber loop started..");
            String content = new String(mulServiceSubscriber.recv(0));
            Log.d(TAG, "Subscriber Received : "+content);
        }
}

}

Publisher.java

public class Publisher extends Thread implements Runnable {

private static final String TAG = "Publisher";
private Context ctx;

public Publisher(ZMQ.Context z_context) {
    this.ctx = z_context;
}

@Override
public void run() {

    super.run();

    ZMQ.Socket publisher = ctx.socket(ZMQ.PUB);
    publisher.connect("tcp://localhost:6000");

    while (true) {
        Log.d(TAG, "Publisher loop started..");
        publisher.send(("A Hello " + new Random(100).nextInt()).getBytes() , 0);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

}

XListener.java (For now a simple Forwarder)

public class XListener extends Thread implements Runnable {

private static final String TAG = null;
private Socket publisherX;
private Context ctx;
private Socket subscriberX;

public XListener(ZMQ.Context ctx, ZMQ.Socket subscriberX,
        ZMQ.Socket publisherX) {
    this.ctx = ctx;
    this.subscriberX = subscriberX;
    this.publisherX = publisherX;

}

@Override
public void run() {
    super.run();
    while (true) {          
        Log.d(TAG, "XListener loop started..");

        String msg = new String(subscriberX.recvStr());
        Log.v(TAG, "Listener Received: " +"MSG :"+msg);
        publisherX.send(msg.getBytes(), 0);         
    }
}

}

in application main()

private void main() {
        ZMQ.Context ctx = ZMQ.context(1);   

    ZMQ.Socket subscriberX = ctx.socket(ZMQ.XSUB);
    subscriberX.bind("tcp://*:6000");
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    ZMQ.Socket publisherX = ctx.socket(ZMQ.XPUB);
    publisherX.bind("tcp://*:6001");
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    new XListener(ctx, subscriberX, publisherX).start();
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

   new XSender(ctx, subscriberX, publisherX).start();
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    new Subscriber(ctx).start();
    new Publisher(ctx).start();
}

With the code I am not able to listen XSUB. While porting espresso.c, I was not able to find any wrapper in java bindings of ZMQ. How to implement a simple proxy or am I missing something??

Ajay Kumar Meher
  • 1,932
  • 16
  • 24
  • what have you done so far? what have been your attempts? Code please... – g19fanatic Jan 25 '13 at 15:32
  • Thanks for replying g19fanatic. Please have a look, I have added the codes. How can I implement a Proxy?? Till now I have not found the wrapper method for `zmq_proxy (subscriber, publisher, listener);`. This is what **espresso.c** is using. – Ajay Kumar Meher Jan 28 '13 at 05:37

1 Answers1

1

Wow I'm answering my own question. I missed to add a forwarder from publisherX to subscriberX. Here is the missing code. Now XSUB and XPUB are able to send and get data.

public class XSender extends Thread implements Runnable {

private static final String TAG = null;
private Socket publisherX;
private Context ctx;
private Socket subscriberX;

public XSender(ZMQ.Context ctx, ZMQ.Socket subscriberX,
        ZMQ.Socket publisherX) {
    this.ctx = ctx;
    this.subscriberX = subscriberX;
    this.publisherX = publisherX;

}

@Override
public void run() {
    super.run();
    while (true) {
        // Read envelope with address
        Log.d(TAG, "XListener loop started..");

        String msg = new String(subscriberX.recv(0));
        Log.v(TAG, "Listener Received: " +"MSG :"+msg);
        publisherX.send(msg.getBytes(), 0);         

    }


}

}

Ajay Kumar Meher
  • 1,932
  • 16
  • 24
  • hi i want to discover other device ip, is it possible using your code? – Jayesh Khasatiya Jul 17 '14 at 12:46
  • This a network architecture that is based on XPUB and XSUB mentioned above in the image. You have to find another way. – Ajay Kumar Meher Jul 20 '14 at 11:41
  • You write "forwarder from publisherX to subscriberX", but the `XSender` in your example reads from `subscriberX` and writes to `publisherX`. This is also what `XListener` does. Is it possible that there is a typo in your code? – Michael Osl Nov 23 '16 at 13:46