I've writing a program using HiveMQ Client (an MQTT Open source implementation in Java) that involves using two multithreaded clients. One client is designated as the publisher and the other as the subscriber (I'm aware I could the same client can both publish and subscribe). I'm trying to design a test where the publisher sends 100 messages to the client. The goal is to time how long it takes to send and receive all the messages. I realized if I wanted to time how long it would take for the messages to be received, I would need to have the Subscribing thread wait until the publishing thread was ready to send the message. I decided to use wait() and notify() but I can't seem to implement it correctly. I'm aware that you need to use the same object which I tried to do but I can't get the design correct. I added snipers on code for both of the run methods of the two clients. CommonThread.java isn't actually a thread and I'm not running it but I tried to use it an in between class to be able to wait() and notify() but I'm missing something.
HiveMQ:
https://github.com/hivemq/hivemq-community-edition
https://github.com/hivemq/hivemq-mqtt-client
SubMainThread.java:
public void run() {
// Creates the client object using Blocking API
Mqtt5BlockingClient subscriber = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between
.serverHost("localhost") // the host name or IP address of the MQTT server. Kept it localhost for testing. localhost is default if not specified.
.serverPort(1883) // specifies the port of the server
.addConnectedListener(context -> ClientConnectionRetreiver.printConnected("Subscriber1")) // prints a string that the client is connected
.addDisconnectedListener(context -> ClientConnectionRetreiver.printDisconnected("Subscriber1")) // prints a string that the client is disconnected
.buildBlocking(); // creates the client builder
subscriber.connect(); // connects the client
ClientConnectionRetreiver.getConnectionInfo(subscriber); // gets connection info
try {
Mqtt5Publishes receivingClient1 = subscriber.publishes(MqttGlobalPublishFilter.ALL); // creates a "publishes" instance thats used to queue incoming messages // .ALL - filters all incoming Publish messages
subscriber.subscribeWith()
.topicFilter(subscriberTopic)
.qos(MqttQos.EXACTLY_ONCE)
.send();
PubSubUtility.printSubscribing("Subscriber1");
System.out.println("Publisher ready to send: " + PubMainThread.readyToSend);
x.threadCondWait(); // <<<<< HOW TO MAKE THIS WORK
System.out.println("Back to the normal execution flow :P");
startTime = System.currentTimeMillis();
System.out.println("Timer started");
for (int i = 1; i <= messageNum; i++) {
Mqtt5Publish receivedMessage = receivingClient1.receive(MESSAGEWAITTIME,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 minutes // .get() returns the object if available or throws a NoSuchElementException
PubSubUtility.convertMessage(receivedMessage); // Converts a Mqtt5Publish instance to string and prints
}
endTime = System.currentTimeMillis();
finalTime = endTime - startTime;
System.out.println( finalTime + PubMainThread.finalTime + " milliseconds");
finalSecTime = TimeUnit.MILLISECONDS.toSeconds(finalTime);
System.out.println(finalSecTime + PubMainThread.finalSecTime);
}
catch (InterruptedException e) { // Catches interruptions in the thread
LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for a message to be received", e);
}
catch (NoSuchElementException e){
System.out.println("There are no received messages"); // Handles when a publish instance has no messages
}
subscriber.disconnect();
}
PubMainThread.java:
public void run() {
// Creates the client object using Blocking API
Mqtt5BlockingClient publisher = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between
.serverHost("localhost") // the host name or IP address of the MQTT server. Kept it localhost for testing. localhost is default if not specified.
.serverPort(1883) // specifies the port of the server
.addConnectedListener(context -> ClientConnectionRetreiver.printConnected("Publisher1")) // prints a string that the client is connected
.addDisconnectedListener(context -> ClientConnectionRetreiver.printDisconnected("Publisher1")) // prints a string that the client is disconnected
.buildBlocking(); // creates the client builder
publisher.connect(); // connects the client
ClientConnectionRetreiver.getConnectionInfo(publisher); // gets connection info
PubSubUtility.printPublising("Publisher1");
readyToSend = true;
x.threadCondNotify(); <<<<< HOW TO MAKE THIS WORK
// Think about making the PubClient Thread sleep for a short while so its not too ahead of the client
startTime = System.currentTimeMillis();
for (int i = 1; i <= messageNum; i++) {
publisher.publishWith()
.topic(publisherTopic) // publishes to the specified topic
.qos(MqttQos.EXACTLY_ONCE)
.payload(convertedMessage) // the contents of the message
.send();
}
endTime = System.currentTimeMillis();
finalTime = endTime - startTime;
finalSecTime = TimeUnit.MILLISECONDS.toSeconds(finalTime);
PubSubUtility.printNumOfPublished("Publisher1", messageNum);
publisher.disconnect();
}
public class CommonThread {
private static final Logger LOGGER = Logger.getLogger(SubMainThread.class.getName()); // Creates a logger instance
public synchronized void threadCondNotify() {
notify();
System.out.println("Notified other thread");
}
public synchronized void threadCondWait() {
try {
while (PubMainThread.readyToSend != true) {
System.out.println("Waiting for another thread....");
wait();
}
}
catch (InterruptedException e) {
LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for another thread", e);
}
}
}