I use FASTAPI
and fastapi_utils
package. My api is to receive users' texts within 3 s and they will be sent to the model to calculate their length (just for a simple demo). So I use the fastapi_utils as a schedule background task. Finally, I will get the result from the dict. But I found that the program is blocked at the while loop and feed_data_into_model
doesn't put the value to the shared_dict. So the while will not end.
import asyncio
import uuid
import logging
from typing import Union, List
import threading
lock = threading.Lock()
from fastapi import FastAPI, Request, Body
from fastapi_utils.tasks import repeat_every
import uvicorn
logger = logging.getLogger(__name__)
app = FastAPI()
queue = asyncio.Queue(maxsize=64)
shared_dict = {} # model result saved here!
lock = threading.Lock()
def handle_dict(key, value = None, action = "put"):
lock.acquire()
try:
if action == "put":
shared_dict[key] = value
elif action == "delete":
del shared_dict[key]
elif action == "get":
value = shared_dict[key]
elif action == "exist":
value = key in shared_dict
else:
pass
finally:
# Always called, even if exception is raised in try block
lock.release()
return value
def model_work(x:Union[str,List[str]]):
if isinstance(x,str):
result = [len(x)]
else:
result = [len(_) for _ in x]
return result
@app.on_event("startup")
@repeat_every(seconds=4, logger=logger, wait_first=True)
async def feed_data_into_model():
if queue.qsize() != 0:
data = []
ids = []
while queue.qsize() != 0:
task = await queue.get()
task_id = task[0]
ids.append(task_id)
text = task[1]
data.append(text)
result = model_work(data)
# print("model result:",result)
for index,task_id in enumerate(ids):
value = result[index]
handle_dict(task_id,value,action = "put")
async def get_response(task_id):
not_exist_flag = True
while not_exist_flag:
not_exist_flag = handle_dict(task_id, None, action= "exist") is False # BUG: it doesn't work
value = handle_dict(task_id, None, action= "get")
handle_dict(task_id, None, action= "delete")
return value
@app.get("/{text}")
async def demo(text:str):
task_id = str(uuid.uuid4())
state = "pending"
item= [task_id,text,state,""]
await queue.put(item)
# !: await query_from_answer_dict
value = await get_response(task_id)
return value
if __name__ == "__main__":
# !: single process run every 4s, if queue not empty then pop them out to model
# !: and model will save result in thread-safe dict, key is task-id
uvicorn.run("api:app", host="0.0.0.0", port=5555)
After the service run, you should access the web API with text. And you will find you are blocked even after 3 seconds. I guess that fastapi_utils doesn't open a new thread to do background task so the main thread is blocked in a while loop since the dict is always empty.