I have two async functions that both need to run constantly and one of them just hogs all of the CPU.
The first function handles receiving websocket messages from a client.
async def handle_message(self, ws):
"""Handles a message from the websocket."""
logger.info('awaiting message')
while True:
msg = await ws.receive()
logger.debug('received message: %s', msg)
jmsg = json.loads(msg['text'])
logger.info('received message: {}'.format(jmsg))
param = jmsg['parameter']
val = jmsg['data']['value']
logger.info('setting parameter {} to {}'.format(param, val))
self.camera.change_parameter(param, val)
The second function grabs images from a camera and sends them to the frontend client. This is the one that one that won't give the other guy any time.
async def send_image(self, ws):
"""Sends an image to the websocket."""
for im in self.camera:
await asyncio.sleep(1000)
h, w = im.shape[:2]
resized = cv2.resize(im, (w // 4, h // 4))
await ws.send_bytes(image_to_bytes(resized))
I'm executing these coroutines using asyncio.gather()
. The decorator is from FastAPI and Backend()
is my class that contains the two async coroutines.
@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
"""Handle a WebSocket connection."""
backend = Backend()
logger.info('Started backend.')
await websocket.accept()
try:
aws = [backend.send_image(websocket), backend.handle_message(websocket)]
done, pending = await asyncio.gather(*aws)
except WebSocketDisconnect:
await websocket.close()
Both of these coroutines will operate seperately, but if I try to run them together send_image()
never gives any time to handle_message
and so none of the messages are ever received (or at least that's what I think is going on).
I thought this is what asyncio
was trying to solve, but I'm probably using it wrong. I thought about using multiprocessing, but I'm pretty sure FastAPI expects awaitables here. I also read about using the return variables from gather()
, but I didn't really understand. Something about canceling the pending
tasks and adding them back to the event loop.
Can anyone show me the correct (and preferably modern pythonic) way to make these async coroutines run concurrently?