3

I'm using FASTApi and trying to implement an endpoint, which starts a job. Once the job is started, the endpoint shall be "locked" until the previous job finished. So far its implemented like this:

myapp.lock = threading.Lock()

@myapp.get("/jobs")
def start_job(some_args):
    if myapp.lock.acquire(False):
        th = threading.Thread(target=job,kwargs=some_args)
        th.start()
        return "Job started"
    else:
        raise HTTPException(status_code=400,detail="Job already running.")

So, when the job gets started, a thread will be created using the method job:

def job(some_args):
    try:
        #doing some stuff, creating objects and writing to files
    finally:
        myapp.lock.release()

So far so good, the endpoint is working, starts a job and locks as long as the job is running. But my problem is that the thread is still alive although the job "finished" and released the lock. I was hoping that the thread would close itself after execution. Maybe the problem is that myapp is keeping it alive? How can I stop it?

hellothere
  • 47
  • 3
  • You cannot use blocking sync methods in an asynchronous application. I would suggest storing the status of the task, for example, in a global object (unless you are using multiple workers). And check it instead of hanging in a lock. Perhaps an example from here will be useful to you. https://stackoverflow.com/questions/63169865/how-to-do-multiprocessing-in-fastapi/ – alex_noname Apr 21 '21 at 10:40
  • Thanks @alex_noname, I figured out a solution. The easiest way for me was using BackgroundTasks from FastAPI. – hellothere Apr 22 '21 at 11:51
  • @alex_noname as this is a synchronous view, FastAPI will automatically start a separate thread for it, so this is not an issue in this situation (the view is not blocking the async loop itself) – GwynBleidD Apr 22 '21 at 14:50
  • 1
    @hellothere consider posting your solution as an answer to this question, as it may help anyone who encounters the same problem. – GwynBleidD Apr 22 '21 at 14:51

2 Answers2

0

I figured out this kind of solution:

myapp.lock = False

@myapp.get("/jobs")
async def start_job(some_args, background_tasks: BackgroundTasks):
    if not myapp.lock:
        background_tasks.add_task(job, some_args)
        return "Job started"
    else:
        raise HTTPException(status_code=400,detail="Job already running.")

def job(some_args):
    try:
        myapp.lock = True
        #doing some stuff, creating objects and writing to files
    finally:
        myapp.lock = False
hellothere
  • 47
  • 3
  • Note: this will only work as long your ASGI Server only spawm one Fork (Worker). As soons as it starts multiple they won't have access to each others variables. – Kound Aug 04 '22 at 15:06
0

I have faced the similar issue. I would like to block the request that want to change the state of a specific object (an object is a simulation in my case). I use PostgreSQL and have created the following

  1. Table
    create table simulation_lock (simulation_id INTEGER PRIMARY KEY, is_locked BOOLEAN DEFAULT False);
  1. Lock context:
from starlette.exceptions import HTTPException

from fmr.database import SessionContext


class SimulationLock:

    def __init__(self, simulation_id: int):
        self._simulation_id = simulation_id
        self._was_acquired = False

    def __enter__(self):
        with SessionContext() as session:
            query = f"update simulation_lock set is_locked = true where " \
                    f"simulation_id = {self._simulation_id} " \
                    f"and is_locked = false returning is_locked;"
            result = session.execute(query).first()
            session.commit()

            if result is not None and result[0] is True:
                self._was_acquired = True
            else:
                raise HTTPException(
                    status_code=500, detail="Already some operations on the "
                                            "simulation are performed.")

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._was_acquired:
            with SessionContext() as session:
                query = f"update simulation_lock set is_locked = false " \
                        f"where simulation_id = {self._simulation_id};"
                session.execute(query)
                session.commit()

  1. Endpoint:
@scenario_router.post(
    "/run"
)
async def run(
        run_scenario_request: RunScenarioRequest,
):
    with SimulationLock(run_scenario_request.scenario_id):
        run_scenario(run_scenario_request)
K4liber
  • 785
  • 1
  • 8
  • 17