2

I have a class with 2 methods, each publishing and subscribing to a common MQTT broker separately.

public class operations{
    public void fetch(){
    // do something

     mqttConn.mqttSubscriberFetch(MQTTtopic);

    // do something
   }

    public void store(){
   // do something

    mqttConn.mqttSubscriberStore(MQTTtopic);

   // do something
   }

}

And the MQTT method for method fetch is as follows:

public class mqttConnectionFetch implements  MqttCallback{

    MqttClient client;

    String topic;

    public mqttConnectionFetch() {
    }


    public void mqttSubscriberFetch(String topic) {

        final String MQTT_BROKER = "tcp://localhost:1883" ;
        final String CLIENT_ID = "fetch";
        try {
            client = new MqttClient(MQTT_BROKER, CLIENT_ID);
            client.connect();
            client.setCallback(this);
            client.subscribe(topic);
            MqttMessage message = new MqttMessage();

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }



    @Override
    public void connectionLost(Throwable cause) {
        // TODO Auto-generated method stub

    }

    @Override
    public void messageArrived(String topic, MqttMessage message)
            throws Exception {

        System.out.println("the message received is "+message);

        if(message.toString().equals("Processed")) {
            MqttPublisherFetch("Processed", "operations/fetch");

        }

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // TODO Auto-generated method stub

    }


    public void MqttPublisherFetch(String message, String topic) {
        final String MQTT_BROKER = "tcp://localhost:1883" ;
        final String CLIENT_ID = "store";
        try {
            client = new MqttClient(MQTT_BROKER, CLIENT_ID);
            client.connect();
            createMqttMessage(message,topic);
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void createMqttMessage(String message, String topic) throws MqttException {
        MqttMessage publishMessage = new MqttMessage();
        publishMessage.setPayload(message.getBytes());
        client.publish(topic, publishMessage);

    }


}

Now i am trying to have such a functionality that whenever my fetch method is subscribing to a topic, if the message from the broker is "Processing" , it should go on wait state. While the store method should be working normally. And when the message received is "Processed", the fetch should again start working.

I tried this with normal java wait() and start(), But I am not getting the desired output. Can someone help me to demystify this ?

user10182984
  • 41
  • 1
  • 5
  • Why not simply use a observer/listener pattern, and call the appropriate method on notification of change of state? – Hovercraft Full Of Eels Aug 11 '18 at 16:52
  • First up there is nearly never a good reason to be running multiple MQTT clients in the same process, one is just fine for both publishing and subscribing – hardillb Aug 11 '18 at 16:54
  • I am not sure i understand what you want... The fetch client as written subscribes to the MQTT server and waits, waking up only when it receives a message from the broker. If it receives a "Processing" message from the broker, it could just ignore/log it and go back to wait (which is what it will do already); if it gets a "Processed" message, it can go handle it however - then go back to wait for the next one. How did you want to change that behavior? – moilejter Aug 11 '18 at 16:54
  • Also what happens if the remoter client is down and you NEVER get a response? – hardillb Aug 11 '18 at 16:54
  • It seems that you are solving some problem but as it was pointed out already in a very inefficient way. Can you describe why you need to do this? Basically can you describe the higher level problem you try to solve. – Roman-Stop RU aggression in UA Aug 11 '18 at 17:08
  • @RomanKonoval I have a REST api which represents a Storage unit. It can perform store and fetch operations which are dependent on a transport operation from some other REST api. MQTT here is used as an event listener. The process is that, when the store operation sends a HTTP request to another REST api for the transport operation, it should STOP until the transport is done. That's why I want the wait. Meanwhile since store and fetch operations in this api are different, wait for store should not effect the fetch. fetch operation should work when store is listening for the MQTT event. – user10182984 Aug 11 '18 at 18:34
  • @hardillb I have not yet implemented that part. But it is a very valid point ! – user10182984 Aug 11 '18 at 18:34
  • @HovercraftFullOfEels Can you please explain this a bit ? – user10182984 Aug 11 '18 at 18:35

1 Answers1

1

As I understand the store method consists of several steps. One of the step is sending a message via MQTT and waiting for the response. From the perspective of the client of the store method it is synchronous that is client will receive response only after the whole processing (including async send/receive via MQTT) is done.

What you want to solve here is a classical problem when one thread needs to wait until some condition happens in another thread. There are many ways to achieve this, just check the number of options proposed in How to make a Java thread wait for another thread's output?.

The simplest would be to use CountDownLatch. I'll use fetch method as this is the one you provided the code for.

First you would modify mqttSubscriberFetch to create a internal CountDownLatch object:

private CountDownLatch processingFinishedLatch;

public void mqttSubscriberFetch(String topic) {

    final String MQTT_BROKER = "tcp://localhost:1883" ;
    final String CLIENT_ID = "fetch";
    try {
        client = new MqttClient(MQTT_BROKER, CLIENT_ID);
        client.connect();
        client.setCallback(this);
        client.subscribe(topic);
        MqttMessage message = new MqttMessage();
        processingFinishedLatch = new CountDownLatch(1);

    } catch (MqttException e) {
        e.printStackTrace();
    }
}

Then you need to trigger this signal in the message receiving callback:

@Override
public void messageArrived(String topic, MqttMessage message)
        throws Exception {

    System.out.println("the message received is "+message);

    if(message.toString().equals("Processed")) {
        MqttPublisherFetch("Processed", "operations/fetch");
        this.processingFinishedLatch.countDown();
    }

}

Also you need to provide the client of the mqttConnectionFetch a way to wait for the latch to become zero:

void waitForProcessingToFinish() throws InterruptedException {
     this.processingFinishedLatch.await();
}

The fetch method should use it like this:

public void fetch(){
// do something

mqttConn.mqttSubscriberFetch(MQTTtopic);

// do everything that is needed to initiate processing

mqttConn.waitForProcessingToFinish()

// do something

}

The thread that is executing fetch will wait in waitForProcessingToFinish until latch reaches zero and that will happen when the appropriate message will come.

This approach can be easily modified to fix the issue when the message never comes. Just use await with some timeout:

boolean waitForProcessingToFinish(long timeout,
        TimeUnit unit) throws InterruptedException {
     return this.processingFinishedLatch.await();
}

fetch should check returned value and probably return error to the caller if timeout happens.

Note that this solution has a downside that thread that is processing fetch will be busy all this time. If this is a thread from the thread pool used for processing incoming HTTP requests this may limit the number of requests your server can process concurrently. There are approaches how to mitigate this like async servlets, RxJava or Spring WebFlux. The actual solution of this problem very much depends on the technology you use for REST API implementation.