50

I want to run a simple background task in FastAPI, which involves some computation before dumping it into the database. However, the computation would block it from receiving any more requests.

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()
db = Database()

async def task(data):
    otherdata = await db.fetch("some sql")
    newdata = somelongcomputation(data,otherdata) # this blocks other requests
    await db.execute("some sql",newdata)
   


@app.post("/profile")
async def profile(data: Data, background_tasks: BackgroundTasks):
    background_tasks.add_task(task, data)
    return {}

What is the best way to solve this issue?

Chris
  • 18,724
  • 6
  • 46
  • 80
Gary Ong
  • 788
  • 1
  • 5
  • 8
  • If the computation is heavy and do not involve IO it is better to use multiprocessing. – alex_noname May 19 '21 at 12:03
  • i am using the docker fastapi for deployment it's using all cpu cores for the server by default. I dont want to use another service like celery as the product is still in prototyping phase and has no users. – Gary Ong May 19 '21 at 13:31
  • 1
    @GaryOng Please have a look at [this related answer](https://stackoverflow.com/a/71517830/17865804) as well. – Chris Jan 15 '23 at 15:36

4 Answers4

88

Your task is defined as async, which means fastapi (or rather starlette) will run it in the asyncio event loop. And because somelongcomputation is synchronous (i.e. not waiting on some IO, but doing computation) it will block the event loop as long as it is running.

I see a few ways of solving this:

  • Use more workers (e.g. uvicorn main:app --workers 4). This will allow up to 4 somelongcomputation in parallel.

  • Rewrite your task to not be async (i.e. define it as def task(data): ... etc). Then starlette will run it in a separate thread.

  • Use fastapi.concurrency.run_in_threadpool, which will also run it in a separate thread. Like so:

    from fastapi.concurrency import run_in_threadpool
    async def task(data):
        otherdata = await db.fetch("some sql")
        newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata))
        await db.execute("some sql", newdata)
    
    • Or use asyncios's run_in_executor directly (which run_in_threadpool uses under the hood):
      import asyncio
      async def task(data):
          otherdata = await db.fetch("some sql")
          loop = asyncio.get_running_loop()
          newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata))
          await db.execute("some sql", newdata)
      
      You could even pass in a concurrent.futures.ProcessPoolExecutor as the first argument to run_in_executor to run it in a separate process.
  • Spawn a separate thread / process yourself. E.g. using concurrent.futures.

  • Use something more heavy-handed like celery. (Also mentioned in the fastapi docs here).

mihi
  • 3,097
  • 16
  • 26
  • 3
    I am facing the same problem here and I wonder why not just using `asyncio.create_task(task(data))`? I am doing some tests and seems to be the solution. – Misael Alarcon Dec 16 '21 at 22:26
  • You mean instead of using `BackgroundTasks`? Are you sure that works? Because `asyncio.create_task` will run the task (and therefore `somelongcomputation`) in the event loop, which will then be blocked, just like in the question. The reason that `run_in_threadpool` works is that it runs the computation in the underlying threadpool directly, sidestepping the event loop. – mihi Dec 18 '21 at 16:31
  • 1
    if not using `async` spawns another thread, isn't this better than using `async`? – Crashalot Jan 25 '22 at 08:42
  • @Crashalot depends on the situation. Have a look at some of the answers here: https://stackoverflow.com/questions/27435284/multiprocessing-vs-multithreading-vs-asyncio-in-python-3, and maybe here: https://discuss.python.org/t/what-are-the-advantages-of-asyncio-over-threads/2112/6. – mihi Jan 27 '22 at 16:22
  • where would one need to pass ```concurrent.futures.ProcessPoolExecutor``` in exactly? In ```newdata = await loop.run_in_executor(ProcessPoolExecutor(), lambda: somelongcomputation(data, otherdata))```? – bky Feb 04 '22 at 10:00
  • @ben yep, have a look at the documenation for some examples (https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) – mihi Feb 04 '22 at 21:48
  • 1
    I don't think Threads are of any help here. Since Python doesn't utilize true Parallelism, spawning thread for heavy CPU computation will still hold the entire program. Use Proccesses instead. – Alexander Farkas Jul 19 '23 at 10:34
  • I agree with Alexander, threads do not help for CPU bounded operations in python, but given the high number of votes here it makes me wonder if I miss something or not... @mihi - would love to here your thoughts here – Shay Tsadok Jul 23 '23 at 07:27
  • 1
    @ShayTsadok Threads won't help with overall throughput, that is true. But using them does prevent blocking of other requests, as context switching between the threads still happens. They won't run in parallel, but rather interleaved. – mihi Jul 26 '23 at 19:09
1

Read this issue.

Also in the example below, my_model.function_b could be any blocking function or process.

TL;DR

from starlette.concurrency import run_in_threadpool

@app.get("/long_answer")
async def long_answer():
    rst = await run_in_threadpool(my_model.function_b, arg_1, arg_2)
    return rst
Zhivar Sourati
  • 503
  • 6
  • 9
1

If your task is CPU bound you could use multiprocessing, there is way to do that with Background task in FastAPI: https://stackoverflow.com/a/63171013

Although you should consider to use something like Celery if there are lot of cpu-heavy tasks.

Adam
  • 459
  • 2
  • 17
1

This is a example of Background Task To FastAPI

from fastapi import FastAPI
import asyncio
app = FastAPI()
x = [1]           # a global variable x
@app.get("/")
def hello():
    return {"message": "hello", "x":x}
async def periodic():
    while True:
        # code to run periodically starts here
        x[0] += 1
        print(f"x is now {x}")
        # code to run periodically ends here
        # sleep for 3 seconds after running above code
        await asyncio.sleep(3)
@app.on_event("startup")
async def schedule_periodic():
    loop = asyncio.get_event_loop()
    loop.create_task(periodic())
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app)
Tanjin Alam
  • 1,728
  • 13
  • 15