so I have a CPU-bound long-running algorithm, let's call it task. Let's say it looks like this:
def task(parameters):
result = 0
for _ in range(10):
for _ in range(10):
for _ in range(10):
result += do_things()
return result
@app.get('/')
def results(parameters: BodyModel):
return task(parameters)
If I encapsulate that in a def
path operation function everything works fine as it is started in a different thread. I can access multiple paths etc. concurrency is doing its job by pushing my CPU-bound task to a separate thread. But I want to switch to WebSockets now, to communicate intermediate results. For that to work, I have to mark my whole thing as asynchronous and pass the WebSocket into my task. So it looks like this:
async def task(parameters):
result = 0
for _ in range(10):
for _ in range(10):
for _ in range(10):
intermediate_result = do_things()
await parameters.websocket.send_text(intermediate_result)
result += intermediate_result
return result
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
parameters = await websocket.receive_text()
parameters.websocket = websocket
result = await task(parameters)
await websocket.send_text(result)
It works like a charm to send the intermediate results. BUT now my algorithm blocks FastAPI as it is not truly asynchronous by itself. Once I post a message to '/ws' FastAPI is blocked and does not respond to any other requests until my task is finished.
So I need some advice on how to
- a) either send WebSocket messages from within a synchronous CPU-bound task (I didn't find a synchronous send_text alternative) so I can use
def
or - b) how to make my CPU-bound truly asynchronous so that it does not block anything anymore when I use
async def
.
I tried using the ProcessPoolExecuter as described here but it's not possible to pickle a coroutine and as far as I understand I have to make my task a coroutine (using async) to use the websocket.send_text()
within it.
Also, I thought about just storing my intermediate results somewhere, make an HTTP POST to start my task, and then have another WebSocket connection to read and send the intermediate results. But then I could also similarly start a background task and implement a regular HTTP polling mechanism. But I don't want either, mainly because I plan to use Google Cloud Run which limits the CPU when all connections are closed. And I think it's better practice to teach my task how to communicate via WebSocket directly.
I hope my question is clear. It's my first larger-scale project with FastAPI and asynchronicity and haven't really used AsyncIO before. So I might have just missed something. Thx for your suggestions.