1

My API is to receive users' texts within 900ms and they will be sent to the model to calculate their length (just for a simple demo). I already realized it but the way is ugly. I will open a new background schedule thread. And API receives the query in the main thread, it will put it in the queue which is shared by the main and new thread. And the new thread will schedule get all texts in the queue and send them to the model. After the model calculated them, results are stored in a shared dict. In the main thread, get_response method will use a while loop to check the result in the shared dict, my question is how can I get rid of the while loop in get_response method. I wanna an elegant method. Thx!

this is server code, need to remove while sleep in get-response because it's ugly :

import asyncio
import uuid
from typing import Union, List
import threading
from queue import Queue
from fastapi import FastAPI, Request, Body, APIRouter
from fastapi_utils.tasks import repeat_every
import uvicorn
import time
import logging
import datetime
logger = logging.getLogger(__name__)

app = APIRouter()
def feed_data_into_model(queue,shared_dict,lock): 
    if queue.qsize() != 0:
        data = []
        ids = []
        while queue.qsize() != 0:
          task = queue.get()
          task_id = task[0]
          ids.append(task_id)
          text = task[1]
          data.append(text)
        result = model_work(data)  
        # print("model result:",result)
        for index,task_id in enumerate(ids):
            value = result[index]
            handle_dict(task_id,value,action = "put",lock=lock, shared_dict = shared_dict)

class TestThreading(object):
    def __init__(self, interval, queue,shared_dict,lock):
        self.interval = interval

        thread = threading.Thread(target=self.run, args=(queue,shared_dict,lock))
        thread.daemon = True
        thread.start()

    def run(self,queue,shared_dict,lock):
        while True:
            # More statements comes here
            # print(datetime.datetime.now().__str__() + ' : Start task in the background')
            feed_data_into_model(queue,shared_dict,lock)
            time.sleep(self.interval)

if __name__ != "__main__":
    # since uvicorn will init and reload the file, and __name__ will change, not as __main__, so I init variable here
    # otherwise, we will have 2 background thread (one is empty) , it doesn't run but hard to debug due to the confusion
    global queue, shared_dict, lock 
    queue = Queue(maxsize=64) #
    shared_dict = {} # model result saved here!
    lock = threading.Lock()
    tr = TestThreading(0.9, queue,shared_dict,lock)

def handle_dict(key, value = None, action = "put", lock = None, shared_dict = None):
    lock.acquire()
    try:
        if action == "put":
            shared_dict[key] = value
        elif action == "delete":
            del shared_dict[key]
        elif action == "get":
            value = shared_dict[key]
        elif action == "exist":
            value = key in shared_dict
        else:
            pass
    finally:
        # Always called, even if exception is raised in try block
        lock.release()
    return value

def model_work(x:Union[str,List[str]]):
    time.sleep(3)
    if isinstance(x,str):
        result = [len(x)]
    else:
        result = [len(_) for _ in x]
    return result

async def get_response(task_id, lock, shared_dict):
    not_exist_flag = True
    while not_exist_flag:
        not_exist_flag = handle_dict(task_id, None, action= "exist",lock=lock, shared_dict = shared_dict) is False 
        await asyncio.sleep(0.02)
    value = handle_dict(task_id, None, action= "get", lock=lock, shared_dict = shared_dict)
    handle_dict(task_id, None, action= "delete",lock=lock, shared_dict = shared_dict)
    return value

@app.get("/{text}")
async def demo(text:str):
    global queue, shared_dict, lock 
    task_id = str(uuid.uuid4())
    logger.info(task_id)
    state = "pending"
    item= [task_id,text,state,""]
    queue.put(item)
    # TODO: await query_from_answer_dict , need to change since it's ugly to while wait the answer
    value = await get_response(task_id, lock, shared_dict)
    return 1

if __name__ == "__main__":
    # what I want to do:
    #  single process run every 900ms, if queue is not empty then pop them out to model
    #  and model will save result in thread-safe dict, key is task-id
    
    uvicorn.run("api:app", host="0.0.0.0", port=5555)

client code:

for n in {1..5}; do curl http://localhost:5555/a & ; done
swordHeart
  • 41
  • 1
  • 2
  • 7

2 Answers2

1

The usual way to run a blocking task in asyncio code is to use asyncio's builtin run_in_executor to handle if for you. You can either setup an executor, or let it do it for you:

import asyncio
from time import sleep


def proc(t):
    print("in thread")
    sleep(t)
    return f"Slept for {t} seconds"


async def submit_task(t):
    print("submitting:", t)
    res = await loop.run_in_executor(None, proc, t)
    print("got:", res)


async def other_task():
    for _ in range(4):
        print("poll!")
        await asyncio.sleep(1)


loop = asyncio.new_event_loop()
loop.create_task(other_task())
loop.run_until_complete(submit_task(3))

Note that if loop is not defined globally, you can get it inside the function with asyncio.get_event_loop(). I've deliberately used a simple example without fastapi/uvicorn to illustrate the point, but the idea is the same: fastapi (etc) just run in the event loop, which is why you define coroutines for the endpoints.

The advantage of this is that we can simply await the response directly, without messing about with awaiting an event and then using some other means (shared dict with mutex, pipe, queue, whatever) to get the result out, which keeps the code clean and readable, and is likely also a good deal quicker. If, for some reason, we want to make sure it runs in processes and not threads we can make our own executor:

from concurrent.futures import ProcessPoolExecutor

e = ProcessPoolExecutor()
...
res = await loop.run_in_executor(e, proc, t)

See the docs for more information.

Another option would be using a multiprocessing.Pool to run the task, and then apply_async. But you can't await multiprocessing futures directly. There is a library aiomultiprocessing to make the two play together but I have no experience with it and cannot see a reason to prefer it over the builtin executor for this case (running a single background task per invocation of the coro).

Lastly do note that the main reason to avoid a polling while loop is not that it's ugly (although it is), but that it's not nearly as performant as almost any other solution.

2e0byo
  • 5,305
  • 1
  • 6
  • 26
0

I think I already got the answer that is using asyncio.event to communicate across threads. Using set, clear, wait and asyncio.get_event_loop().

swordHeart
  • 41
  • 1
  • 2
  • 7