0

I'm creating a python3 tornado web server that may listen to an MQTT broker and whenever listens a new message from it, broadcasts it to the connected browsers, through web sockets. However, seems that Tornado doesn't like calls to its API from a thread different to IOLoop.current() and I can't figure out another solution...

I've already tried to write some code. I've put the whole MQTT client (in this case called PMCU client), on a separated thread which loops and listens to MQTT notifications.

def on_pmcu_data(data):
    for websocket_client in websocket_clients:
        print("Sending websocket message")
        websocket_client.write_message(data)  # Here it stuck!
        print("Sent")

class WebSocketHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        websocket_clients.append(self)

    def on_close(self):
        websocket_clients.remove(self)

def make_app():
    return tornado.web.Application([
        (r'/ws', WebSocketHandler)
    ])

if __name__ == "__main__":
    main_loop = IOLoop().current()

    pmcu_client = PMCUClient(on_pmcu_data)
    threading.Thread(target=lambda: pmcu_client.listen("5.4.3.2")).start()

    app = make_app()
    app.listen(8080)
    main_loop.start()

However as I said, seems that calls to Tornado API outside the IOLoop.current() blocks: the code above only prints Sending websocket message.

My intent is to run websocket_client.write_message(data) on IOLoop.current() event loop. But seems that the function IOLoop.current().spawn_callback(lambda: websocket_client.write_message(data)) not works after IOLoop.current() has started. How could I achieve that?

I know that I have a huge misunderstanding of IOLoop, asyncio, on which it depends, and python3 async.

loryruta
  • 111
  • 3
  • 9

1 Answers1

0

on_pmcu_data is being called in a separate thread but the websocket is controlled by Tornado's event loop. You can't write to a websocket from a thread unless you have access to the event loop.

You'll need to ask the IOLoop to write the data to websockets.

Solution 1:

For simple cases, if you don't want to change much in the code, you can do this:

if __name__ == "__main__":
    main_loop = IOLoop().current()

    on_pmcu_data_callback = lambda data: main_loop.add_callback(on_pmcu_data, data)

    pmcu_client = PMCUClient(on_pmcu_data_callback)

    ...

This should solve your problem.


Solution 2:

For more elaborate cases, you can pass the main_loop to PMCUClient class and then use add_callback (or spawn_callback) to run on_pmcu_data.

Example:

if __name__ == "__main__":
    main_loop = IOLoop().current()

    pmcu_client = PMCUClient(on_pmcu_data, main_loop) # also pass the main loop

    ...

Then in PMCUCLient class:

class PMCUClient:
    def __init__(self, on_pmcu_data, main_loop):
        ...
        self.main_loop = main_loop

    def lister(...):
        ...
        self.main_loop.add_callback(self.on_pmcu_data, data)
xyres
  • 20,487
  • 3
  • 56
  • 85