This answer is not about improving CPU time—as you mentioned in the comments section—but rather explains what would happen, if you defined an endpoint with normal def
or async def
, as well as provides solutions when you run blocking operations inside an endpoint.
You are asking how to stop the processing of a request after a while, in order to process further requests. It does not really make that sense to start processing a request, and then (60 seconds later) stop it as if it never happened (wasting server resources all that time and having other requests waiting). You should instead let the handling of requests to FastAPI framework itself. When you define an endpoint with async def
, it is run on the main thread (in the event loop), i.e., the server processes the requests sequentially, as long as there is no await
call inside the endpoint (just like in your case). The keyword await
passes function control back to the event loop. In other words, it suspends the execution of the surrounding coroutine, and tells the event loop to let something else run, until the await
ed task completes (and has returned the result data). The await
keyword only works within an async
function.
Since you perform a heavy CPU-bound operation inside your async def
endpoint (by calling your some_func()
function), and you never give up control for other requests to run in the event loop (e.g., by await
ing for some coroutine), the server will be blocked and wait for that request to be fully processed and complete, before moving on to the next one(s)—have a look at this answer for more details.
Solutions
One solution would be to define your endpoint with normal def
instead of async def
. In brief, when you declare an endpoint with normal def
instead of async def
in FastAPI, it is run in an external threadpool that is then await
ed, instead of being called directly (as it would block the server); hence, FastAPI would still work asynchronously.
Another solution, as described in this answer, is to keep the async def
definition and run the CPU-bound operation in a separate thread and await
it, using Starlette's run_in_threadpool()
, thus ensuring that the main thread (event loop), where coroutines are run, does not get blocked. As described by @tiangolo here, "run_in_threadpool
is an awaitable function, the first parameter is a normal function, the next parameters are passed to that function directly. It supports sequence arguments and keyword arguments". Example:
from fastapi.concurrency import run_in_threadpool
res = await run_in_threadpool(cpu_bound_task, text='Hello world')
Since this is about a CPU-bound operation, it would be preferable to run it in a separate process, using ProcessPoolExecutor
, as described in the link provided above. In this case, this could be integrated with asyncio
, in order to await
the process to finish its work and return the result(s). Note that, as described in the link above, it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc—essentially, your code must be under if __name__ == '__main__'
. Example:
import concurrent.futures
from functools import partial
import asyncio
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, partial(cpu_bound_task, text='Hello world'))
About Request Timeout
With regards to the recent update on your question about the client having a fixed 60s request timeout; if you are not behind a proxy such as Nginx that would allow you to set the request timeout, and/or you are not using gunicorn, which would also allow you to adjust the request timeout
, you could use a middleware, as suggested here, to set a timeout for all incoming requests. The suggested middleware (example is given below) uses asyncio's .wait_for()
function, which waits for an awaitable function/coroutine to complete with a timeout. If a timeout occurs, it cancels the task and raises asyncio.TimeoutError
.
Regarding your comment below:
My requirement is not unblocking next request...
Again, please read carefully the first part of this answer to understand that if you define your endpoint with async def
and not await
for some coroutine inside, but instead perform some CPU-bound task (as you already do), it will block the server until is completed (and even the approach below wont' work as expected). That's like saying that you would like FastAPI to process one request at a time; in that case, there is no reason to use an ASGI framework such as FastAPI, which takes advantage of the async
/await
syntax (i.e., processing requests asynchronously), in order to provide fast performance. Hence, you either need to drop the async
definition from your endpoint (as mentioned earlier above), or, preferably, run your synchronous CPU-bound task using ProcessPoolExecutor
, as described earlier.
Also, your comment in some_func()
:
Some computationally heavy function whose execution time depends on
input text size
indicates that instead of (or along with) setting a request timeout, you could check the length of input text (using a dependency fucntion, for instance) and raise an HTTPException
in case the text's length exceeds some pre-defined value, which is known beforehand to require more than 60s to complete the processing. In that way, your system won't waste resources trying to perform a task, which you already know will not be completed.
Working Example
import time
import uvicorn
import asyncio
import concurrent.futures
from functools import partial
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from starlette.status import HTTP_504_GATEWAY_TIMEOUT
from fastapi.concurrency import run_in_threadpool
REQUEST_TIMEOUT = 2 # adjust timeout as desired
app = FastAPI()
@app.middleware('http')
async def timeout_middleware(request: Request, call_next):
try:
return await asyncio.wait_for(call_next(request), timeout=REQUEST_TIMEOUT)
except asyncio.TimeoutError:
return JSONResponse({'detail': f'Request exceeded the time limit for processing'},
status_code=HTTP_504_GATEWAY_TIMEOUT)
def cpu_bound_task(text):
time.sleep(5)
return text
@app.get('/')
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, partial(cpu_bound_task, text='Hello world'))
return {'response': res}
if __name__ == '__main__':
uvicorn.run(app)