22

I have deployed a fastapi endpoint,

from fastapi import FastAPI, UploadFile
from typing import List

app = FastAPI()

@app.post('/work/test')
async def testing(files: List(UploadFile)):
    for i in files:
        .......
        # do a lot of operations on each file

        # after than I am just writing that processed data into mysql database
        # cur.execute(...)
        # cur.commit()
        .......
    
    # just returning "OK" to confirm data is written into mysql
    return {"response" : "OK"}

I can request output from the API endpoint and its working fine for me perfectly.

Now, the biggest challenge for me to know how much time it is taking for each iteration. Because in the UI part (those who are accessing my API endpoint) I want to help them show a progress bar (TIME TAKEN) for each iteration/file being processed.

Is there any possible way for me to achieve it? If so, please help me out on how can I proceed further?

Thank you.

user_12
  • 1,778
  • 7
  • 31
  • 72
  • do you have access to UI code ? – Andriy Ivaneyko Nov 27 '20 at 08:49
  • No actually, I don't have access to UI code. I just want to provide some indication of each file being processed behind the scenes which they can access. Just some basic indication. Nothing to complicated. Currently, they can only get the final response after processing all files. so no way to provide some indication. – user_12 Nov 27 '20 at 10:51
  • is that fine to provide separate url to get the status of processing ? – Andriy Ivaneyko Nov 27 '20 at 11:07
  • Yes, its totally fine. I tried using web-socket and all but I was not able to figure it out. – user_12 Nov 27 '20 at 13:01
  • no need for web sockets, you can work that out with other approaches, does answers below answer your question (if no I can give it a look )? – Andriy Ivaneyko Nov 27 '20 at 16:04
  • I tried one of the approach given below but couldn't make it work for my code. I am looking for other approaches. Please feel free to post your approach it would help a lot. – user_12 Nov 27 '20 at 16:52
  • how many threads and processes in use by the application ? – Andriy Ivaneyko Nov 27 '20 at 18:00
  • you can give it a shot :) – Andriy Ivaneyko Nov 27 '20 at 18:49
  • If I use `joblib` library to run my loop on multiple processor then will the below approach work for me? – user_12 Nov 27 '20 at 18:51
  • no, you will need database which holds jobs and the job status shall query database to view it's status – Andriy Ivaneyko Nov 27 '20 at 18:52
  • okay, I'll try the approach below and will let you know if it works out for me. Thank you – user_12 Nov 27 '20 at 18:54
  • sure, the async web servers is pretty powerful, so I bet single web server thread could handle plenty of items, if that's for prototype than it's more than enough :) Have a nice weekend man ! – Andriy Ivaneyko Nov 27 '20 at 19:27

2 Answers2

15

Approaches

Polling

The most preferred approach to track the progress of a task is polling:

  1. After receiving a request to start a task on a backend:
    1. Create a task object in the storage (e.g in-memory, redis and etc.). The task object must contain the following data: task ID, status (pending, completed), result, and others.
    2. Run task in the background (coroutines, threading, multiprocessing, task queue like Celery, arq, aio-pika, dramatiq and etc.)
    3. Response immediately the answer 202 (Accepted) by returning the previously received task ID.
  2. Update task status:
    1. This can be from within the task itself, if it knows about the task store and has access to it. Periodically, the task itself updates information about itself.
    2. Or use a task monitor (Observer, producer-consumer pattern), which will monitor the status of the task and its result. And it will also update the information in the storage.
  3. On the client side (front-end) start a polling cycle for the task status to endpoint /task/{ID}/status, which takes information from the task storage.

Streaming response

Streaming is a less convenient way of getting the status of request processing periodically. When we gradually push responses without closing the connection. It has a number of significant disadvantages, for example, if the connection is broken, you can lose information. Streaming Api is another approach than REST Api.

Websockets

You can also use websockets for real-time notifications and bidirectional communication.

Links:

  • Examples of polling approach for the progress bar and a more detailed description for django + celery can be found at these links:

https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html

https://buildwithdjango.com/blog/post/celery-progress-bars/

  • I have provided simplified examples of running background tasks in FastAPI using multiprocessing here:

https://stackoverflow.com/a/63171013/13782669

Old answer:

You could run a task in the background, return its id and provide a /status endpoint that the front would periodically call. In the status response, you could return what state your task is now (for example, pending with the number of the currently processed file). I provided a few simple examples here.

Demo

Polling

Demo of the approach using asyncio tasks (single worker solution):

import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}  # Dict as job storage


async def long_task(queue: asyncio.Queue, param: int):
    for i in range(1, param):  # do work and return our progress
        await asyncio.sleep(1)
        await queue.put(i)
    await queue.put(None)


async def start_new_task(uid: UUID, param: int) -> None:

    queue = asyncio.Queue()
    task = asyncio.create_task(long_task(queue, param))

    while progress := await queue.get():  # monitor task progress
        jobs[uid].progress = progress

    jobs[uid].status = "complete"


@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_new_task, new_task.uid, param)
    return new_task


@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Adapted example for loop from question

Background processing function is defined as def and FastAPI runs it on the thread pool.

import time
from http import HTTPStatus

from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    processed_files: List[str] = Field(default_factory=list)


app = FastAPI()
jobs: Dict[UUID, Job] = {}


def process_files(task_id: UUID, files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        jobs[task_id].processed_files.append(i.filename)
    jobs[task_id].status = "completed"


@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(process_files, new_task.uid, files)
    return new_task


@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Streaming

async def process_files_gen(files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        yield f"{i.filename} processed\n"
    yield f"OK\n"


@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
    return StreamingResponse(process_files_gen(files))
alex_noname
  • 26,459
  • 5
  • 69
  • 86
  • How to extend the Old Answer Demo Polling Example to multiple workers and servers? – Baenka Mar 03 '22 at 08:14
  • Soln to my comment above: One solution to deploy the app using uvicorn with multiple workers is to create task_id as a string combination of uuid4 and pid. – Baenka Mar 03 '22 at 08:50
  • Instead of worker-local dictionary you can use shared storage like database or in-memory storage. Related topic https://stackoverflow.com/questions/65686318/sharing-python-objects-across-multiple-workers/65699375#65699375 – alex_noname Mar 03 '22 at 08:52
9

Below is solution which uses uniq identifiers and globally available dictionary which holds information about the jobs:

NOTE: Code below is safe to use until you use dynamic keys values ( In sample uuid in use) and keep application within single process.

  1. To start the app create a file main.py
  2. Run uvicorn main:app --reload
  3. Create job entry by accessing http://127.0.0.1:8000/
  4. Repeat step 3 to create multiple jobs
  5. Go to http://127.0.0.1/status page to see page statuses.
  6. Go to http://127.0.0.1/status/{identifier} to see progression of the job by the job id.

Code of app:

from fastapi import FastAPI, UploadFile
import uuid
from typing import List


import asyncio


context = {'jobs': {}}

app = FastAPI()



async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    pending_jobs[job_key]['status'] = 'done'


@app.post('/work/test')
async def testing(files: List[UploadFile]):
    identifier = str(uuid.uuid4())
    context[jobs][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())

    return {"identifier": identifier}


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return {"identifier": identifier}

@app.get('/status')
def status():
    return {
        'all': list(context['jobs'].values()),
    }

@app.get('/status/{identifier}')
async def status(identifier):
    return {
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    }

Andriy Ivaneyko
  • 20,639
  • 6
  • 60
  • 82
  • This seems to be working for me on single worker. Thank you! If you have some free time can you point out how can I parallelize the for loop inside the do_work function? I want to use something like joblib or something similar to parallelize the for loop. I have asked the question [here](https://stackoverflow.com/questions/65132243/how-to-parallelize-the-for-loop-inside-a-async-function-and-track-for-loop-execu) – user_12 Dec 03 '20 at 18:51
  • 1
    you can create `async def` which handles input from every iteration and store the future objects in a list, than call async io gather: https://docs.python.org/3/library/asyncio-task.html#asyncio.gather, outside of the for loop. You can try by own, I can provide some sample once I have time for this ;) – Andriy Ivaneyko Dec 04 '20 at 10:04
  • 1
    Thank you so much. I am not very familiar with async io operations, but I will try and please when you have some time, please try to tackle the question here, https://stackoverflow.com/questions/65132243/how-to-parallelize-the-for-loop-inside-a-async-function-and-track-for-loop-execu – user_12 Dec 04 '20 at 10:21
  • If you have some time, please try to help me out. I have been trying to find a way but failed, if you are familiar with a way using which I can parallelize the for loop and also being able to track the iteration? Here is the bounty question link : https://stackoverflow.com/questions/65132243/how-to-parallelize-the-for-loop-inside-a-async-function-and-track-for-loop-execu – user_12 Dec 06 '20 at 22:09
  • 1
    @user_12 sure, i will take a look on it :) – Andriy Ivaneyko Dec 07 '20 at 10:57
  • pending_jobs variable is undefined and the status isn't reached 100. Please guide. – Tamil Selvan C Jul 25 '22 at 09:12