I am working with FastAPI application which needs to perform heavy computations for calculating its answer. I've moved heavy computations to the remote machine with GPU. I use Celery to perform remote procedure call. The current solution is the following:
- User requests my FastAPI endpoint method
- Inside this method there is a call to Celery RPC and then
task.get()
await with timeout for the result - When method gets results back from RPC it sends response to the user.
My FastAPI has 32 workers on 16-core machine, but as I've figured out, all procedures are executed sequentially, which probably means that Celery .get()
method blocks other FastAPI threads.
My code is the following:
#celery_worker.py
app = Celery('celery_worker', broker= f'amqp://{rabbitmq_username}:{rabbitmq_password}@{rabbitmq_host}:{rabbitmq_port}/', backend='rpc://')
@app.task
def remote_procedure(a, b):
#some heavy computations
return res
#main.py
from project.celery_worker import remote_procedure
RESPONSE_TIMEOUT_SECONDS = 1.5
@app.get('/search/')
async def search(a,b):
try:
res = remote_procedure.apply_async((a,b), expires = RESPONSE_TIMEOUT_SECONDS+0.1, retry=False).get(timeout=RESPONSE_TIMEOUT_SECONDS)
except celery.exceptions.TimeoutError:
print('timeout')
return {"res":"timeout"}
return {"res":res}
def main():
"""
Main function
"""
uvicorn.run(app, host='127.0.0.1', port=CONFIG['port'])
if __name__ == '__main__':
main()
The problem is that when I send 100 requests, I expect that some of them will be computed, some of then will return with timeout, but ALL of them must return in less than 2 sec. In fact I get situation where all of them are computed in a sequential way, so for some requests I'm waiting for response for about 15 seconds, which is unacceptable.
I suspect the problem is with blocking .get()
method, but I'm not sure. Is there an alternative which will allow me to block the current thread while waiting for the RPC results, but won't block whole FastAPI concurrency?