1

I'm trying to start multiple tasks concurrently with asyncio, but the I observe that the tasks actually don't run concurrently. There is something that I must miss. My know-how about asyncio is growing but I'm still in the learning curve. I'm using asyncio.Queue() to transfer data between the classes. Here's some code:

main.py

async def main():

    ws = WebSocket()
    await ws.startServer(ip, port) # websocket's send and receive handlers should work concurently
    await ws.websocketMsgs.otherTask # this task should also run concurrently with the two above

asyncio.run(main(), debug=False)

webSocket.py

from webSocketMsgs import WebSocketMsgs

class WebSocket:

    def __init__(self):

        self.server = None
        self.sendTask = None
        self.receiveTask = None
        
        self.websocketMsgs = WebSocketMsgs()


    async def startServer(self, ipAddress, port):

        self.server = await websockets.serve(lambda websocket, path : self.msgHandler(websocket, path),  ipAddress, port)
        await self.server.wait_closed()

        
    async def msgHandler(self, websocket, path):

        self.sendTask = asyncio.create_task(self.sendHandler(websocket, path))
        self.receiveTask = asyncio.create_task(self.receiveHandler(websocket, path))
        await asyncio.wait([self.sendTask, self.receiveTask], return_when = asyncio.ALL_COMPLETED)

    async def sendHandler(self, websocket, path):

        while True:

            itemToSend = await self.websocketMsgs.qOutgoingData.get()
            await websocket.send(itemToSend)
            self.websocketMsgs.qOutgoingData.task_done()


    async def receiveHandler(self, websocket, path):
        
        async for message in websocket:

            await self.websocketMsgs.processIncomingMsgs(message)

webSocketMsgs.py

class WebSocketMsgs:
    
    def __init__(self):

        # An asyncio.Queue() that is populated in an other class
        self.qIncomingSensorData = self.someclass.qIncomingSensorData 

        self.qOutgoingData       = asyncio.Queue()

        self.otherTask = asyncio.create_task(self.consumeSensorData())


    async def processIncomingMsgs(self, msg):
        
        # Do stuff with messages coming from the websocket

        
    async def consumeSensorData(self):

        while True:

            msg = await self.qIncomingSensorData.get() # Data to be processed and sent out via websocket

            # Some logic
                
            self.qOutgoingData.put_nowait(formattedMsg)

I observe the following:

  • the receiveHandler reacts as soon as I send a message over the websocket
  • the otherTask is running and populates the queue with an element every 1 second
  • the sendHandler stays behind and sends 20 messages at once every 20 seconds if no message is received over the websocket. If I send dummy messages over the websocket it "unlocks" the sendHandler task and sends the amount of messages accumulated during x seconds.

Can anyone explain me why I observe such behavior and suggest a way to arrange my code to have everything running concurrently ?

LZR_13
  • 45
  • 2
  • 6
  • Who populates `qIncomingSensorData`, and how? – user4815162342 Jul 15 '20 at 13:15
  • Hi Hrvoje :) it's the receivedMsgCallback from by previous post [link] (https://stackoverflow.com/questions/62810399/pythons-asyncio-event-across-different-classes), that also now uses `call_soon_threadsafe()`. I use `self.qIncomingSensorData.put_nowait(msg)` in the callback. Thank you so much for your support – LZR_13 Jul 15 '20 at 14:37
  • Hi! If you use `call_soon_threadsafe`, then it's really unclear why it would get stuck. – user4815162342 Jul 15 '20 at 14:54

1 Answers1

0

Just use call_soon_threadsafe(self.qIncomingSensorData.put_nowait, msg) in the callback that runs in a different thread.

LZR_13
  • 45
  • 2
  • 6