4

Use case

The client micro service, which calls /do_something, has a timeout of 60 seconds in the request/post() call. This timeout is fixed and can't be changed. So if /do_something takes 10 mins, /do_something is wasting CPU resources since the client micro service is NOT waiting after 60 seconds for the response from /do_something, which wastes CPU for 10 mins and this increases the cost. We have limited budget.

The current code looks like this:

import time
from uvicorn import Server, Config
from random import randrange
from fastapi import FastAPI

app = FastAPI()

def some_func(text):
    """
    Some computationally heavy function
    whose execution time depends on input text size
    """
    randinteger = randrange(1,120)
    time.sleep(randinteger)# simulate processing of text
    return text


@app.get("/do_something")
async def do_something():
    response = some_func(text="hello world")
    return {"response": response}

# Running
if __name__ == '__main__':
    server = Server(Config(app=app, host='0.0.0.0', port=3001))
    server.run()

Desired Solution

  1. Here /do_something should stop the processing of the current request to endpoint after 60 seconds and wait for next request to process.

  2. If execution of the end point is force stopped after 60 seconds we should be able to log it with custom message.

  3. This should not kill the service and work with multithreading/multiprocessing.

I tried this. But when timeout happends the server is getting killed. Any solution to fix this?

import logging
import time
import timeout_decorator
from uvicorn import Server, Config
from random import randrange
from fastapi import FastAPI

app = FastAPI()


@timeout_decorator.timeout(seconds=2, timeout_exception=StopIteration, use_signals=False)
def some_func(text):
    """
    Some computationally heavy function
    whose execution time depends on input text size
    """
    randinteger = randrange(1,30)
    time.sleep(randinteger)# simulate processing of text
    return text


@app.get("/do_something")
async def do_something():
    try:
        response = some_func(text="hello world")
    except StopIteration:
        logging.warning(f'Stopped /do_something > endpoint due to timeout!')
    else:
        logging.info(f'(  Completed < /do_something > endpoint')

    return {"response": response}


# Running 
if __name__ == '__main__':
    server = Server(Config(app=app, host='0.0.0.0', port=3001))
    server.run()
Chris
  • 18,724
  • 6
  • 46
  • 80
GeorgeOfTheRF
  • 8,244
  • 23
  • 57
  • 80
  • Does this answer your question? [FastAPI runs api-calls in serial instead of parallel fashion](https://stackoverflow.com/questions/71516140/fastapi-runs-api-calls-in-serial-instead-of-parallel-fashion) – Chris Sep 14 '22 at 14:02
  • 1
    @Chris No. I am not trying to improve cpu time. My requirement is different. I want to timeout the endpoint after x mins. – GeorgeOfTheRF Sep 16 '22 at 09:50
  • @GeorgeOfTheRF Did you find a solution? `timeout_middleware` doesn't work in my case – PATAPOsha Apr 14 '23 at 11:51

1 Answers1

4

This answer is not about improving CPU time—as you mentioned in the comments section—but rather explains what would happen, if you defined an endpoint with normal def or async def, as well as provides solutions when you run blocking operations inside an endpoint.

You are asking how to stop the processing of a request after a while, in order to process further requests. It does not really make that sense to start processing a request, and then (60 seconds later) stop it as if it never happened (wasting server resources all that time and having other requests waiting). You should instead let the handling of requests to FastAPI framework itself. When you define an endpoint with async def, it is run on the main thread (in the event loop), i.e., the server processes the requests sequentially, as long as there is no await call inside the endpoint (just like in your case). The keyword await passes function control back to the event loop. In other words, it suspends the execution of the surrounding coroutine, and tells the event loop to let something else run, until the awaited task completes (and has returned the result data). The await keyword only works within an async function.

Since you perform a heavy CPU-bound operation inside your async def endpoint (by calling your some_func() function), and you never give up control for other requests to run in the event loop (e.g., by awaiting for some coroutine), the server will be blocked and wait for that request to be fully processed and complete, before moving on to the next one(s)—have a look at this answer for more details.

Solutions

One solution would be to define your endpoint with normal def instead of async def. In brief, when you declare an endpoint with normal def instead of async def in FastAPI, it is run in an external threadpool that is then awaited, instead of being called directly (as it would block the server); hence, FastAPI would still work asynchronously.

Another solution, as described in this answer, is to keep the async def definition and run the CPU-bound operation in a separate thread and await it, using Starlette's run_in_threadpool(), thus ensuring that the main thread (event loop), where coroutines are run, does not get blocked. As described by @tiangolo here, "run_in_threadpool is an awaitable function, the first parameter is a normal function, the next parameters are passed to that function directly. It supports sequence arguments and keyword arguments". Example:

from fastapi.concurrency import run_in_threadpool

res = await run_in_threadpool(cpu_bound_task, text='Hello world')

Since this is about a CPU-bound operation, it would be preferable to run it in a separate process, using ProcessPoolExecutor, as described in the link provided above. In this case, this could be integrated with asyncio, in order to await the process to finish its work and return the result(s). Note that, as described in the link above, it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc—essentially, your code must be under if __name__ == '__main__'. Example:

import concurrent.futures
from functools import partial
import asyncio

loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
    res = await loop.run_in_executor(pool, partial(cpu_bound_task, text='Hello world'))

About Request Timeout

With regards to the recent update on your question about the client having a fixed 60s request timeout; if you are not behind a proxy such as Nginx that would allow you to set the request timeout, and/or you are not using gunicorn, which would also allow you to adjust the request timeout, you could use a middleware, as suggested here, to set a timeout for all incoming requests. The suggested middleware (example is given below) uses asyncio's .wait_for() function, which waits for an awaitable function/coroutine to complete with a timeout. If a timeout occurs, it cancels the task and raises asyncio.TimeoutError.

Regarding your comment below:

My requirement is not unblocking next request...

Again, please read carefully the first part of this answer to understand that if you define your endpoint with async def and not await for some coroutine inside, but instead perform some CPU-bound task (as you already do), it will block the server until is completed (and even the approach below wont' work as expected). That's like saying that you would like FastAPI to process one request at a time; in that case, there is no reason to use an ASGI framework such as FastAPI, which takes advantage of the async/await syntax (i.e., processing requests asynchronously), in order to provide fast performance. Hence, you either need to drop the async definition from your endpoint (as mentioned earlier above), or, preferably, run your synchronous CPU-bound task using ProcessPoolExecutor, as described earlier.

Also, your comment in some_func():

Some computationally heavy function whose execution time depends on input text size

indicates that instead of (or along with) setting a request timeout, you could check the length of input text (using a dependency fucntion, for instance) and raise an HTTPException in case the text's length exceeds some pre-defined value, which is known beforehand to require more than 60s to complete the processing. In that way, your system won't waste resources trying to perform a task, which you already know will not be completed.

Working Example

import time
import uvicorn
import asyncio
import concurrent.futures
from functools import partial
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from starlette.status import HTTP_504_GATEWAY_TIMEOUT
from fastapi.concurrency import run_in_threadpool

REQUEST_TIMEOUT = 2  # adjust timeout as desired
app = FastAPI()

@app.middleware('http')
async def timeout_middleware(request: Request, call_next):
    try:
        return await asyncio.wait_for(call_next(request), timeout=REQUEST_TIMEOUT)
    except asyncio.TimeoutError:
        return JSONResponse({'detail': f'Request exceeded the time limit for processing'},
                            status_code=HTTP_504_GATEWAY_TIMEOUT)

def cpu_bound_task(text):
    time.sleep(5)
    return text

@app.get('/')
async def main():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, partial(cpu_bound_task, text='Hello world'))

    return {'response': res}
    
if __name__ == '__main__':
    uvicorn.run(app)
Chris
  • 18,724
  • 6
  • 46
  • 80
  • 3
    Thanks. But my usecase/requirement is different than what you describe. My requirement is not unblocking next request but reduce cost by stopping processing of end point when client already timed out. I have edited the 1st paragraph of original to explain it better. – GeorgeOfTheRF Sep 16 '22 at 12:42
  • Please have a look above. – Chris Sep 23 '22 at 08:21
  • This meddleware works when CPU is enough, but when CPU is trotling my "frozen" requests still take up to 20 minutes. It can't get to `timeout_middleware`. In my case, freeze is happening somehere in `middleware.starlette.send` I use uvicorn workers in Kubernetes (1 pod - 1 uvicorn worker). And have some workers randomly freezeing. – PATAPOsha Apr 14 '23 at 11:49