0

I've writing a program using HiveMQ Client (an MQTT Open source implementation in Java) that involves using two multithreaded clients. One client is designated as the publisher and the other as the subscriber (I'm aware I could the same client can both publish and subscribe). I'm trying to design a test where the publisher sends 100 messages to the client. The goal is to time how long it takes to send and receive all the messages. I realized if I wanted to time how long it would take for the messages to be received, I would need to have the Subscribing thread wait until the publishing thread was ready to send the message. I decided to use wait() and notify() but I can't seem to implement it correctly. I'm aware that you need to use the same object which I tried to do but I can't get the design correct. I added snipers on code for both of the run methods of the two clients. CommonThread.java isn't actually a thread and I'm not running it but I tried to use it an in between class to be able to wait() and notify() but I'm missing something.

HiveMQ:

https://github.com/hivemq/hivemq-community-edition

https://github.com/hivemq/hivemq-mqtt-client

SubMainThread.java:

public void run() {

    // Creates the client object using Blocking API 

     Mqtt5BlockingClient subscriber = Mqtt5Client.builder()
    .identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between 
    .serverHost("localhost")  // the host name or IP address of the MQTT server. Kept it localhost for testing. localhost is default if not specified.
    .serverPort(1883)  // specifies the port of the server
    .addConnectedListener(context -> ClientConnectionRetreiver.printConnected("Subscriber1"))        // prints a string that the client is connected
    .addDisconnectedListener(context -> ClientConnectionRetreiver.printDisconnected("Subscriber1"))  // prints a string that the client is disconnected
    .buildBlocking();  // creates the client builder                
    subscriber.connect();  // connects the client
    ClientConnectionRetreiver.getConnectionInfo(subscriber);   // gets connection info

    try {
        Mqtt5Publishes receivingClient1 = subscriber.publishes(MqttGlobalPublishFilter.ALL);  // creates a "publishes" instance thats used to queue incoming messages                                                                   // .ALL - filters all incoming Publish messages     

        subscriber.subscribeWith()   
        .topicFilter(subscriberTopic) 
        .qos(MqttQos.EXACTLY_ONCE)
        .send(); 

        PubSubUtility.printSubscribing("Subscriber1"); 


        System.out.println("Publisher ready to send: " + PubMainThread.readyToSend);

        x.threadCondWait();    // <<<<< HOW TO MAKE THIS WORK 
        System.out.println("Back to the normal execution flow :P");


        startTime = System.currentTimeMillis();
        System.out.println("Timer started");

        for (int i = 1; i <= messageNum; i++) {  
            Mqtt5Publish receivedMessage = receivingClient1.receive(MESSAGEWAITTIME,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 minutes                                                                         // .get() returns the object if available or throws a NoSuchElementException 
            PubSubUtility.convertMessage(receivedMessage);  // Converts a Mqtt5Publish instance to string and prints 
            }   

        endTime = System.currentTimeMillis();

        finalTime = endTime - startTime;
        System.out.println( finalTime + PubMainThread.finalTime + " milliseconds");
        finalSecTime = TimeUnit.MILLISECONDS.toSeconds(finalTime);

        System.out.println(finalSecTime + PubMainThread.finalSecTime);
        }   

    catch (InterruptedException e) {    // Catches interruptions in the thread 
        LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for a message to be received", e);
        }

    catch (NoSuchElementException e){
        System.out.println("There are no received messages");   // Handles when a publish instance has no messages 
        }

    subscriber.disconnect();  

    }

PubMainThread.java:

public void run() {

    // Creates the client object using Blocking API 

    Mqtt5BlockingClient publisher = Mqtt5Client.builder()
    .identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between 
    .serverHost("localhost")  // the host name or IP address of the MQTT server. Kept it localhost for testing. localhost is default if not specified.
    .serverPort(1883)  // specifies the port of the server
    .addConnectedListener(context -> ClientConnectionRetreiver.printConnected("Publisher1"))         // prints a string that the client is connected
    .addDisconnectedListener(context -> ClientConnectionRetreiver.printDisconnected("Publisher1"))  // prints a string that the client is disconnected
    .buildBlocking();  // creates the client builder                
    publisher.connect();  // connects the client
    ClientConnectionRetreiver.getConnectionInfo(publisher);   // gets connection info

    PubSubUtility.printPublising("Publisher1");
    readyToSend = true; 
    x.threadCondNotify();        <<<<< HOW TO MAKE THIS WORK 
    // Think about making the PubClient Thread sleep for a short while so its not too ahead of the client
    startTime = System.currentTimeMillis();

        for (int i = 1; i <= messageNum; i++) { 
             publisher.publishWith()    
             .topic(publisherTopic)   // publishes to the specified topic
             .qos(MqttQos.EXACTLY_ONCE)  
             .payload(convertedMessage)  // the contents of the message 
             .send();
        }

    endTime = System.currentTimeMillis();
    finalTime = endTime - startTime;    
    finalSecTime = TimeUnit.MILLISECONDS.toSeconds(finalTime);

    PubSubUtility.printNumOfPublished("Publisher1", messageNum);    


    publisher.disconnect();  
    }

public class CommonThread {

private static final Logger LOGGER = Logger.getLogger(SubMainThread.class.getName());  // Creates a logger instance 


public synchronized void threadCondNotify() {
    notify();
    System.out.println("Notified other thread");
}

public synchronized void threadCondWait() {

    try {       
        while (PubMainThread.readyToSend != true) {
        System.out.println("Waiting for another thread....");
        wait();
        }
    }

    catch (InterruptedException e) {
        LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for another thread", e);
    }
    }

}
Chigozie A.
  • 335
  • 4
  • 16
  • How much do you understand about how to properly use the wait/notify mechanism? For example, do you understand that the entire purpose of this semantic is to provide an atomic "unlock and wait" operation to avoid the race condition that an unlock followed by a wait would cause? You posted a lot of code, but it's not clear what level of help you need. Do you need a basic introduction to wait/notify? Or is there something specific about your code that you need help with? – David Schwartz Jun 16 '19 at 23:24
  • @DavidSchwartz I need a basic introduction of wait/notify. I was really looking if someone could give some pseudo code based off what I want to do which is why I posted a lot of code. I'm trying to be able have one thread wait for the other so I can properly time receiving 100 messages. Any pseudo code relating to this problem like this would be very helpful. – Chigozie A. Jun 16 '19 at 23:34
  • You'd be better off asking the basic questions to fill in whatever it is that you don't understand. Trying to communicate the basics in the context of a very complex question makes things harder. I would strongly suggest you start off learning exactly what wait/notify actually does (solves the "unlock and wait" race condition), that wait is unconditional, what a predicate is, and so on. – David Schwartz Jun 17 '19 at 01:04

1 Answers1

-1

In Sender (rough Java code with some details omitted):

//package statement and imports here

class Sender extends Thread {
    public static final Boolean x= new Boolean(true); 

    public void run() {
        //initialize here
        synchronized(x) {
            x.notify();
        }
        //send messages here
    }
}

In Receiver (start before Sender):

//package statement and imports here

class Receiver extends Thread {

    public void run() {
        //initialize here
        synchronized(Sender.x) {
            Sender.x.wait(); //blocks till Sender.x.notify()
        }
        Date start= new Date();
        //receive messages here
        Date end= new Date();
        int duration_milliseconds= end.getTime()-start.getTime();
    }
}

maybe you have to add

try{ /* code here */ } catch (InterruptedException e) {}

Feel free to discuss sense and nonsense of direct use of notify() and wait() especially in Java versions with extended concurrency libraries...

Plexar
  • 100
  • 5
  • 1
    What object should x be? An instance from the sender or receiver? – Chigozie A. Jun 17 '19 at 00:01
  • x should be any object accessible by the sender and the receiver. – Plexar Jun 17 '19 at 00:03
  • 2
    At least the OP's code calls `wait` in a loop while testing some condition. This is one step forward, two steps back. – David Schwartz Jun 17 '19 at 01:05
  • 2
    Wait() and notify() are primitive operations that are meant to be used in a very specific way. Oracle has a good explanation of it here: https://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html – Solomon Slow Jun 17 '19 at 11:40
  • @SolomonSlow Thanks for the link to that Oracle article, it cleared things up. I wasn't using constructors in my classes so they didn't share a common object to invoke wait() and notify(). It works now. – Chigozie A. Jun 17 '19 at 15:07
  • ...edited my answer for clarification... – Plexar Jun 18 '19 at 18:30