According to this tutorial you can create BackgroundTasks
in the route's handler function as follow:
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
but in my case, I have a huge number of routes, so I want to centralize the background task in a custom APIRoute (this is a simplified example):
# core/users.py
def process(email):
#processing
#create a background task here
# router.py
class MyRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
response: Response = await original_route_handler(request)
return response
return custom_route_handler
# app.py
from core import users
@app.post("/send-notification/{email}")
async def send_notification(email: str):
#processing