2

According to this tutorial you can create BackgroundTasks in the route's handler function as follow:

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

but in my case, I have a huge number of routes, so I want to centralize the background task in a custom APIRoute (this is a simplified example):

# core/users.py
def process(email):
    #processing
    #create a background task here

# router.py
class MyRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            response: Response = await original_route_handler(request)
            return response

    return custom_route_handler

# app.py
from core import users
@app.post("/send-notification/{email}")
async def send_notification(email: str):
    #processing
tahayk
  • 492
  • 1
  • 5
  • 20

4 Answers4

2

You can do that, but you should attach the background object to the response. You can check starlette official doc at Background Tasks

# core/users.py
from fastapi import BackgroundTasks

def send_email(email):
    print("sending email to {} as background task!".format(email))

def process(email):
    background_tasks = BackgroundTasks()
    background_tasks.add_task(send_email, email)
    return background_tasks


#app.py
from core import users
from fastapi.responses import JSONResponse

@app.post("/send-notification/{email}")
async def send_notification(email: str):
    background_tasks = users.process(email)
    return JSONResponse("Email sent!", background=background_tasks)
Mahyar Zarifkar
  • 176
  • 1
  • 12
2

Thank you everyone for the ideas; but according to my use case (the background task is created in a custom APIRoute, after the response is created).

I ended up with something like:

# core/users.py
def process(response: Response):
    background_task: BackgroundTask = BackgroundTask(async_process, MY_ARG1)
    if response.background is None:
        response.background = background_task
    else:
        response.background.add_task(background_task.func, *background_task.args, *background_task.kwargs)

# router.py
class MyRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            response: Response = await original_route_handler(request)
            users.process(response=response)
            return response

        return custom_route_handler

# app.py
app = APIRouter(route_class=MyRoute)
@app.post("/send-notification/{email}")
async def send_notification(email: str):
    #processing

PS: These code snippets are a simplified version of what I used in creating a "trace-module" for the OpenReplay project.

rsmith54
  • 818
  • 9
  • 15
tahayk
  • 492
  • 1
  • 5
  • 20
  • Hi @tahayk, nice solution, good job! However, there is one thing I did not fully understand. Why do you need the if/else in the `process` function? Thanks in advance. – Ignacio Vergara Kausel Jun 27 '22 at 09:08
0

BackgroundTasks is just a helper, you can run it in thread/loop by yourself:

def run_in_background(func, *args):
    if asyncio.iscoroutinefunction(func):
        loop = asyncio.get_event_loop()
        loop.create_task(func("param"))
    else:
        # Starlette uses anyio library here under the hood
        t = threading.Thread(target=func, args=("param",))
        t.start()
kosciej16
  • 6,294
  • 1
  • 18
  • 29
0

background task come from Starlette and it is attached to a response.

As fastapi doc said:

"It's still possible to use BackgroundTask alone in FastAPI, but you have to create the object in your code and return a Starlette Response including it."

you may want to take a look at runing your function in a threadpool from tiangolo (creator of fastapi)

from fastapi.concurrency import run_in_threadpool

@api.get('/handler')
async def handler():
    ...
    # Slow async function
    await my_async_function()
    ....
    # Slow running sync function
    await run_in_threadpool(sync_function)
Bastien B
  • 1,018
  • 8
  • 25