0

I have a scenario where mobile app calls rest API hosted by my application. With in this process, I need to send message to downstream system over MQTT and wait until I get the response for that message. And then I have reply back to mobile app.

The challenge here is, messaging over MQTT is asynchronous. So the message which I receive back will be in different thread (some listener class, listening on messageArrived()). How to get back to calling http thread?

Do we have synchronous communication supported by Paho library.? Something like I send a message, open some topic and wait on it till some message is received or timeout?

hardillb
  • 54,545
  • 11
  • 67
  • 105
  • This is the second time this type question has been asked in 2 days (https://stackoverflow.com/questions/45413507/handling-mqtt-communication-inside-http-request-in-node-red), it wouldn't happen to be for an assignment of some kind would it? – hardillb Aug 01 '17 at 08:16
  • Possible duplicate of [Handling MQTT communication inside HTTP request in Node-Red](https://stackoverflow.com/questions/45413507/handling-mqtt-communication-inside-http-request-in-node-red) – ΦXocę 웃 Пepeúpa ツ Aug 01 '17 at 16:55
  • This is not a duplicate or I would have marked it as one. One is very specific to Node-RED and this is about implementing something similar in most likely java. – hardillb Aug 01 '17 at 17:18

2 Answers2

0

In this code I have managed Asynchronous Paho Python MQTT communication synchronously by using Queue and while loop.

MTTQ broker https://mosquitto.org/

Paho client https://pypi.org/project/paho-mqtt/

    # python3.6
    
    import random
    
    from paho.mqtt import client as mqtt_client
    from queue import Queue
    import datetime
    import time
    
    
    broker = '127.0.0.1'#'broker.emqx.io'
    port = 1883
    topic_1 = "Parent/Topic1"
    topic_2 = "Parent/Topic2"
    msg1_start = "topic1: start"
    msg1_started = "topic1: started"
    msg1_finished = "topic1: finished"
    msg2_start = "topic2: start"
    msg2_started = "topic2: started"
    msg2_finished = "topic2: finished"
    # Generate a Client ID with the subscribe prefix.
    client_id = f'subscribe-{random.randint(0, 100)}'
    # username = 'emqx'
    # password = 'public'
    msgQueue = Queue()
    
    
    def connect_mqtt() -> mqtt_client:
        def on_connect(client, userdata, flags, rc):
            if rc == 0:
                print(f"{datetime.datetime.now()} [Info] Connected to MQTT Broker!")
            else:
                print(f"{datetime.datetime.now()} [Error] Failed to connect, return code %d\n", rc)
    
        client = mqtt_client.Client(client_id)
        # client.username_pw_set(username, password)
        client.on_connect = on_connect
        client.connect(broker, port)
        return client
    
    
    def subscribe(client: mqtt_client):
        def on_message(client, userdata, msg):
            print(f"{datetime.datetime.now()} [Info] Received `{msg.payload.decode()}` from `{msg.topic}` topic")
            msgQueue.put(msg)
    
        client.subscribe(topic_1)
        client.subscribe(topic_2)
        client.on_message = on_message
    
    def publish(client,topic,msg):
    
            result = client.publish(topic, msg)
            # result: [0, 1]
            status = result[0]
            if status == 0:
                print(f"{datetime.datetime.now()} [Info] Send `{msg}` to topic `{topic}`")
            else:
                print(f"{datetime.datetime.now()} [Error] Failed to send message to topic {topic}")    
    
    
    def run():
        client = connect_mqtt()
        time.sleep(1)
        publish(client,topic_1,msg1_start)
        subscribe(client)
        #client.loop_forever()
        client.loop_start()
        while True:
            message = msgQueue.get()
            if message is None:
                continue
            print(f"{datetime.datetime.now()} [Info] received from queue",str(message.payload.decode("utf-8")))
            if message.topic == topic_1 and message.payload.decode() == msg1_finished:
                publish(client,topic_2,msg2_start)
            elif message.topic == topic_2 and message.payload.decode() == msg2_finished:
                break
    
           
        client.loop_stop() #client thread needs to be stopped when topic2 completed.
    
        
    
    if __name__ == '__main__':
        run()

Deepp
  • 93
  • 1
  • 7
-2

MQTT by it's very nature is asynchronous, as are all Pub/Sub implementations. There is no concept of a reply to a message at the protocol level, you have no way of knowing if you will EVER get a response (or you may get many) to a published message as you can't know if there is even a subscriber to the topic you publish on.

It is possible to build a system that will work this way, but you need to maintain a state machine of all in flight requests, implement a sensible timeout policy and work out what to do if you get more than one response.

You have not mentioned which of the different Paho libraries you are using, but I'm guessing Java from the method names, but without knowing what HTTP framework you are using and a host of other factors I'm not going to suggest a solution, especially as it will involve a lot of polling and synchronisation.

Is there any reason why the mobile application can't publish and subscribe to MQTT topics directly? This would remove the need for this.

hardillb
  • 54,545
  • 11
  • 67
  • 105