1

I use FASTAPI and fastapi_utils package. My api is to receive users' texts within 3 s and they will be sent to the model to calculate their length (just for a simple demo). So I use the fastapi_utils as a schedule background task. Finally, I will get the result from the dict. But I found that the program is blocked at the while loop and feed_data_into_model doesn't put the value to the shared_dict. So the while will not end.


import asyncio
import uuid
import logging
from typing import Union, List
import threading
lock = threading.Lock()
from fastapi import FastAPI, Request, Body
from fastapi_utils.tasks import repeat_every
import uvicorn
logger = logging.getLogger(__name__)
app = FastAPI()
queue = asyncio.Queue(maxsize=64)

shared_dict = {} # model result saved here!

lock = threading.Lock()

def handle_dict(key, value = None, action = "put"):
    lock.acquire()
    try:
        if action == "put":
            shared_dict[key] = value
        elif action == "delete":
            del shared_dict[key]
        elif action == "get":
            value = shared_dict[key]
        elif action == "exist":
            value = key in shared_dict
        else:
            pass
    finally:
        # Always called, even if exception is raised in try block
        lock.release()
    return value

def model_work(x:Union[str,List[str]]):
    if isinstance(x,str):
        result = [len(x)]
    else:
        result = [len(_) for _ in x]
    return result

@app.on_event("startup")
@repeat_every(seconds=4, logger=logger, wait_first=True)
async def feed_data_into_model():
    if queue.qsize() != 0:
        data = []
        ids = []
        while queue.qsize() != 0:
          task = await queue.get()
          task_id = task[0]
          ids.append(task_id)
          text = task[1]
          data.append(text)
        result = model_work(data)  
        # print("model result:",result)
        for index,task_id in enumerate(ids):
            value = result[index]
            handle_dict(task_id,value,action = "put")

async def get_response(task_id):
    not_exist_flag = True
    while not_exist_flag:
        not_exist_flag = handle_dict(task_id, None, action= "exist") is False # BUG: it doesn't work
    value = handle_dict(task_id, None, action= "get")
    handle_dict(task_id, None, action= "delete")
    return value

@app.get("/{text}")
async def demo(text:str):
    task_id = str(uuid.uuid4())
    state = "pending"
    item= [task_id,text,state,""]
    await queue.put(item)
    # !: await query_from_answer_dict
    value = await get_response(task_id)
    return value

if __name__ == "__main__":
    # !: single process run every 4s, if queue not empty then pop them out to model
    # !: and model will save result in thread-safe dict, key is task-id
    uvicorn.run("api:app", host="0.0.0.0", port=5555)

After the service run, you should access the web API with text. And you will find you are blocked even after 3 seconds. I guess that fastapi_utils doesn't open a new thread to do background task so the main thread is blocked in a while loop since the dict is always empty.

2e0byo
  • 5,305
  • 1
  • 6
  • 26
swordHeart
  • 41
  • 1
  • 2
  • 7

2 Answers2

1

The problem at the moment is the use of blocking code in an asyncio loop. If you insert a short delay it will work:

    while not_exist_flag:
        not_exist_flag = handle_dict(task_id, None, action="exist") is False
        await asyncio.sleep(0.1)

The reason is that you need to let the scheduler go elsewhere and actually do the processing! Asyncio is not a free pass to write blocking code, sadly. But adding a delay is a very non-optimal solution.*

A better solution would be to have your get_response funtion await the task directly, since currently everything is in one thread, and there is no advantage to handing processing over to a separate queue. Or use multiprocessing, and submit the task whilst keeping a local reference to it. Then you can await the future directly, and avoid using polling.

By time you've done this you've nearly reinvented celery. The fastapi project generator includes celery by default: if you really need to hand these tasks off to another process, you might want to look at doing that.

In general, try to avoid polling in asyncio. You want to await everything.

*It's non-optimal because:

  • polling is happening at the highest level, so it's already slower than doing it in c
  • polling here involves calling a whole function which acquires a lock, thus we have the context switch cost (from the function) the lock cost, and the blocking of anything else trying to use the lock
  • your polling interval directly effects the time available for other code to run

Note that your polling loop could have been written:

while not handle_dict(task_id, None, action="exist"):
    pass

Which shows up the busy loop more clearly.

2e0byo
  • 5,305
  • 1
  • 6
  • 26
  • thank you, after editing and I add a sleep (as the model costed time for every batch) in the model. Finally I found this API work in streaming style. It doesn't save any time! Is it because that fastapi_util still work in same thread rather than open a new thread? – swordHeart Nov 02 '21 at 07:56
  • Indeed: asyncio only works in one thread in general (and fastapi, and all the tools it's built on, are mostly asyncio). If you do have tasks you want to bacground, you need to create the thread or process yourself, with something like multiprocessing, or run a dedicated task server like celery. – 2e0byo Nov 02 '21 at 12:14
  • hi, I have already got it and implement it with a new thread. But I found that using while and sleep in get_response and using dict to get result is ugly. Could you please tell me how to wait and get the changed result variable in a elegant method? Thx! I will put current solution in below answer. But need your help to remove while in get response since the result is got in background thread. I need a notify to tell me to access the value when finished – swordHeart Nov 03 '21 at 08:25
  • @swordHeart this is really a new question. if you post with your code below, someone will answer it. If the question hasn't been answered by time I get back I'll write an answer :) – 2e0byo Nov 03 '21 at 08:47
  • hi @2e0byo , I post it as new question [here](https://stackoverflow.com/questions/69823990/python-how-to-check-whether-the-variable-state-is-changed-which-is-shared-and-ed) , very glad to hear your shared knowledge. thx! – swordHeart Nov 03 '21 at 11:27
  • I change to using asyncio.Event to communicate across threads. So I need't a loop-while in my code. – swordHeart Nov 04 '21 at 05:53
1

the server code, need to remove while sleep in get-response because it's ugly :


import asyncio
import uuid
from typing import Union, List
import threading
from queue import Queue
from fastapi import FastAPI, Request, Body, APIRouter
from fastapi_utils.tasks import repeat_every
import uvicorn
import time
import logging
import datetime
logger = logging.getLogger(__name__)

app = APIRouter()
def feed_data_into_model(queue,shared_dict,lock): 
    if queue.qsize() != 0:
        data = []
        ids = []
        while queue.qsize() != 0:
          task = queue.get()
          task_id = task[0]
          ids.append(task_id)
          text = task[1]
          data.append(text)
        result = model_work(data)  
        # print("model result:",result)
        for index,task_id in enumerate(ids):
            value = result[index]
            handle_dict(task_id,value,action = "put",lock=lock, shared_dict = shared_dict)

class TestThreading(object):
    def __init__(self, interval, queue,shared_dict,lock):
        self.interval = interval

        thread = threading.Thread(target=self.run, args=(queue,shared_dict,lock))
        thread.daemon = True
        thread.start()

    def run(self,queue,shared_dict,lock):
        while True:
            # More statements comes here
            # print(datetime.datetime.now().__str__() + ' : Start task in the background')
            feed_data_into_model(queue,shared_dict,lock)
            time.sleep(self.interval)

if __name__ != "__main__":
    # since uvicorn will init and reload the file, and __name__ will change, not as __main__, so I init variable here
    # otherwise, we will have 2 background thread (one is empty) , it doesn't run but hard to debug due to the confusion
    global queue, shared_dict, lock 
    queue = Queue(maxsize=64) #
    shared_dict = {} # model result saved here!
    lock = threading.Lock()
    tr = TestThreading(3, queue,shared_dict,lock)

def handle_dict(key, value = None, action = "put", lock = None, shared_dict = None):
    lock.acquire()
    try:
        if action == "put":
            shared_dict[key] = value
        elif action == "delete":
            del shared_dict[key]
        elif action == "get":
            value = shared_dict[key]
        elif action == "exist":
            value = key in shared_dict
        else:
            pass
    finally:
        # Always called, even if exception is raised in try block
        lock.release()
    return value

def model_work(x:Union[str,List[str]]):
    time.sleep(3)
    if isinstance(x,str):
        result = [len(x)]
    else:
        result = [len(_) for _ in x]
    return result

async def get_response(task_id, lock, shared_dict):
    not_exist_flag = True
    while not_exist_flag:
        not_exist_flag = handle_dict(task_id, None, action= "exist",lock=lock, shared_dict = shared_dict) is False 
        await asyncio.sleep(0.02)
    value = handle_dict(task_id, None, action= "get", lock=lock, shared_dict = shared_dict)
    handle_dict(task_id, None, action= "delete",lock=lock, shared_dict = shared_dict)
    return value

@app.get("/{text}")
async def demo(text:str):
    global queue, shared_dict, lock 
    task_id = str(uuid.uuid4())
    logger.info(task_id)
    state = "pending"
    item= [task_id,text,state,""]
    queue.put(item)
    # TODO: await query_from_answer_dict , need to change since it's ugly to while wait the answer
    value = await get_response(task_id, lock, shared_dict)
    return 1

if __name__ == "__main__":
    # what I want to do:
    #  single process run every 3s, if queue not empty then pop them out to model
    #  and model will save result in thread-safe dict, key is task-id
    
    uvicorn.run("api:app", host="0.0.0.0", port=5555)

the client test code:

for n in {1..5}; do curl http://localhost:5555/a & ; done
swordHeart
  • 41
  • 1
  • 2
  • 7