9

I'm making a server in python using FastAPI, and I want a function that is not related to my API, to run in background every 5 minutes (like checking stuff from an API and printing stuff depending on the response)

I've tried to make a thread that runs the function start_worker, but it doesn't print anything.

Does anyone know how to do so ?

def start_worker():
    print('[main]: starting worker...')
    my_worker = worker.Worker()
    my_worker.working_loop() # this function prints "hello" every 5 seconds

if __name__ == '__main__':
    print('[main]: starting...')
    uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
    _worker_thread = Thread(target=start_worker, daemon=False)
    _worker_thread.start()
Chris
  • 18,724
  • 6
  • 46
  • 80
Gabriel Knies
  • 129
  • 1
  • 1
  • 8
  • Try moving the thread stuff to before the run(). It's possible the run() doesn't return until the server dies. – Frank Yellin Jan 27 '22 at 01:43
  • doesn't work as well, it doesn't even print('[main]: starting...'), but the api is working – Gabriel Knies Jan 27 '22 at 02:06
  • The solution I found was to create an endpoint for background work. The endpoint is hit with a CRON job. The upside of using an endpoint is that you can have the code running with async/await functions including a database. I used the "background.add_task" function to launch the background job and return an ok immediately to the CRON request. – rpontual Jan 27 '22 at 03:29

2 Answers2

16

Option 1

You should start your Thread before calling uvicorn.run, as uvicorn.run is blocking the thread.

import time
import threading
from fastapi import FastAPI
import uvicorn

app = FastAPI()
class BackgroundTasks(threading.Thread):
    def run(self,*args,**kwargs):
        while True:
            print('Hello')
            time.sleep(5)
  
if __name__ == '__main__':
    t = BackgroundTasks()
    t.start()
    uvicorn.run(app, host="0.0.0.0", port=8000)

You could also start your thread using FastAPI's startup event, as long as it is ok to run before the application starts.

@app.on_event("startup")
async def startup_event():
    t = BackgroundTasks()
    t.start()

Option 2

You could instead use a repeating Event scheduler for the background task, as below:

import sched, time
from threading import Thread
from fastapi import FastAPI
import uvicorn

app = FastAPI()
s = sched.scheduler(time.time, time.sleep)

def print_event(sc): 
    print("Hello")
    sc.enter(5, 1, print_event, (sc,))

def start_scheduler():
    s.enter(5, 1, print_event, (s,))
    s.run()

@app.on_event("startup")
async def startup_event():
    thread = Thread(target = start_scheduler)
    thread.start()

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)

Option 3

If your task is an async def function (see this answer for more details on def vs async def endpoints/background tasks in FastAPI), then you could add the task to the current event loop, using the asyncio.create_task() function. The create_task() function takes a coroutine object (i.e., an async def function) and returns a Task object (which can be used to await the task, if needed, or cancel it , etc). The call creates the task inside the event loop for the current thread, and executes it in the "background" concurrently with all other tasks in the event loop, switching between them at await points.

It is required to have an event loop created before calling create_task(), and this is already created when starting the uvicorn server either programmatically (using, for instance, uvicorn.run(app)) or in the terminal (using, for instance, uvicorn app:app). Instead of using asyncio.create_task(), one could also use asyncio.get_running_loop() to get the current event loop, and then call loop.create_task().

The example below uses the recently documented way for adding lifespan events (using a context manager), i.e., code that should be executed before the application starts up, as well as when the application is shutting down (see the documentation for more details). One could also still use the startup and shutdown events, as demonstrated in the previous options; however, those event handlers might be removed from future FastAPI/Starlette versions.

from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio


async def print_task(s): 
    while True:
        print('Hello')
        await asyncio.sleep(s)

        
@asynccontextmanager
async def lifespan(app: FastAPI):
    asyncio.create_task(print_task(5))
    yield


app = FastAPI(lifespan=lifespan)
Chris
  • 18,724
  • 6
  • 46
  • 80
  • 3
    How resilient is the subprocess? Is it possible for the BackgroundTasks service to be killed, and if so, does FastAPI's event handler restart it automatically? – muad-dweeb Aug 30 '22 at 21:29
1

@Chris's solution works correctly!

However, if you want to improve it and remove the redundant start_scheduler method, you just need to pass the sc argument directly to the print_event method with kwargs param as below:

import sched
import time
from threading import Thread

import uvicorn
from fastapi import FastAPI


app = FastAPI()
s = sched.scheduler(time.time, time.sleep)


def print_event(sc=None): 
    print("Hello")
    sc.enter(5, 1, print_event, (sc,))
    sc.run()


@app.on_event("startup")
async def startup_event():
    thread = Thread(target=print_event, kwargs=dict(sc=s))
    thread.start()


if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)
Aida.Mirabadi
  • 996
  • 4
  • 10
  • 27