I'm trying to start multiple tasks concurrently with asyncio, but the I observe that the tasks actually don't run concurrently. There is something that I must miss. My know-how about asyncio is growing but I'm still in the learning curve. I'm using asyncio.Queue() to transfer data between the classes. Here's some code:
main.py
async def main():
ws = WebSocket()
await ws.startServer(ip, port) # websocket's send and receive handlers should work concurently
await ws.websocketMsgs.otherTask # this task should also run concurrently with the two above
asyncio.run(main(), debug=False)
webSocket.py
from webSocketMsgs import WebSocketMsgs
class WebSocket:
def __init__(self):
self.server = None
self.sendTask = None
self.receiveTask = None
self.websocketMsgs = WebSocketMsgs()
async def startServer(self, ipAddress, port):
self.server = await websockets.serve(lambda websocket, path : self.msgHandler(websocket, path), ipAddress, port)
await self.server.wait_closed()
async def msgHandler(self, websocket, path):
self.sendTask = asyncio.create_task(self.sendHandler(websocket, path))
self.receiveTask = asyncio.create_task(self.receiveHandler(websocket, path))
await asyncio.wait([self.sendTask, self.receiveTask], return_when = asyncio.ALL_COMPLETED)
async def sendHandler(self, websocket, path):
while True:
itemToSend = await self.websocketMsgs.qOutgoingData.get()
await websocket.send(itemToSend)
self.websocketMsgs.qOutgoingData.task_done()
async def receiveHandler(self, websocket, path):
async for message in websocket:
await self.websocketMsgs.processIncomingMsgs(message)
webSocketMsgs.py
class WebSocketMsgs:
def __init__(self):
# An asyncio.Queue() that is populated in an other class
self.qIncomingSensorData = self.someclass.qIncomingSensorData
self.qOutgoingData = asyncio.Queue()
self.otherTask = asyncio.create_task(self.consumeSensorData())
async def processIncomingMsgs(self, msg):
# Do stuff with messages coming from the websocket
async def consumeSensorData(self):
while True:
msg = await self.qIncomingSensorData.get() # Data to be processed and sent out via websocket
# Some logic
self.qOutgoingData.put_nowait(formattedMsg)
I observe the following:
- the
receiveHandler
reacts as soon as I send a message over the websocket - the
otherTask
is running and populates the queue with an element every 1 second - the
sendHandler
stays behind and sends 20 messages at once every 20 seconds if no message is received over the websocket. If I send dummy messages over the websocket it "unlocks" the sendHandler task and sends the amount of messages accumulated during x seconds.
Can anyone explain me why I observe such behavior and suggest a way to arrange my code to have everything running concurrently ?