2

I am trying to implement a pub-sub example using ZeroMQ. I run the publisher's code in a docker container and the subscriber's code in another one.

My subscriber is:

private ZMQ.Context context;

{
    context = ZMQ.context(1);
}

public void receive() {
    System.out.println("Getting subscriber, listening to tcp://localhost:5565");
    getSubscriber();
    byte[] raw;
    System.out.println("Watching for new Event messages...");
    try {
        while (!Thread.currentThread().isInterrupted()) {
            raw = subscriber.recv();
            System.out.println("Event received " + raw);
        }
    } catch (Exception e) {
        System.out.println("Unable to receive messages via ZMQ: " + e.getMessage());
    }
    if (subscriber != null)
        subscriber.close();
    subscriber = null;
    System.out.println("Attempting restart of Event message watch.");
    receive();
}

private ZMQ.Socket getSubscriber() {
    if (subscriber == null) {
        try {
            subscriber = context.socket(ZMQ.SUB);
            subscriber.connect("tcp://localhost:5565");
            subscriber.subscribe("".getBytes());
        } catch (Exception e) {
            System.out.println("Unable to get a ZMQ subscriber.  Error:  " + e);
            subscriber = null;
        }
    }
    return subscriber;
}

And my publisher is:

private ZMQ.Context context;

{
    context = ZMQ.context(1);
}

public synchronized void sendEventMessage(Event event) {
    try {
        if (publisher == null) {
            getPublisher();
        }
        if (publisher != null) {
            publisher.send(event);
        } 
    } catch (Exception e) {
        System.out.println("Unable to send message via ZMQ");
    }
}

private void getPublisher() {
    try {
        if (publisher == null) {
            publisher = context.socket(ZMQ.PUB);
            publisher.bind("tcp://192.168.32.9:5565");   //where 192.168.32.9 is the IP of the subscriber's docker container
            Thread.sleep(PUB_UP_SLEEP); // allow subscribers to connect
        }
    } catch (Exception e) {
        System.out.println("Unable to get a publisher. Error:  " + e);
        publisher = null;
    }
}

When I start the application, I register a subscriber and the logs are:

[2018-12-10 08:01:02.138] boot - 1  INFO [main] --- ZeroMQEventSubscriber: Getting subscriber, listening to tcp://localhost:5565
[2018-12-10 08:01:02.249] boot - 1  INFO [main] --- ZeroMQEventSubscriber: Watching for new Event messages...

My problem is that when I invoke sendEventMessage, the subscriber does not receive anything and on the publisher I get this error:

[2018-12-10 08:54:16.388] boot - 1 ERROR [task-scheduler-5] --- ZeroMQEventPublisherImpl: Unable to get a publisher.  Error:  org.zeromq.ZMQException: Errno 48 : Address already in use

Any ideas why I cannot bind to the address where the subscriber has connected?

Marievi
  • 4,951
  • 1
  • 16
  • 33

0 Answers0