I am trying to set up a fastAPI app doing the following:
- Accept messages as post requests and put them in a queue;
- A background job is, from time to time, pulling messages (up to a certain batch size) from the queue, processing them in a batch, and storing results in a dictionary;
- The app is retrieving results from the dictionary and sending them back "as soon as" they are done.
To do so, I've set up a background job with apscheduler communicating via a queue trying to make a simplified version of this post: https://levelup.gitconnected.com/fastapi-how-to-process-incoming-requests-in-batches-b384a1406ec. Here is the code of my app:
import queue
import uuid
from asyncio import sleep
import uvicorn
from pydantic import BaseModel
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
app = FastAPI()
app.input_queue = queue.Queue()
app.output_dict = {}
app.queue_limit = 2
def upper_messages():
for i in range(app.queue_limit):
try:
obj = app.input_queue.get_nowait()
app.output_dict[obj['request_id']] = obj['text'].upper()
except queue.Empty:
pass
app.scheduler = AsyncIOScheduler()
app.scheduler.add_job(upper_messages, 'interval', seconds=5)
app.scheduler.start()
async def get_result(request_id):
while True:
if request_id in app.output_dict:
result = app.output_dict[request_id]
del app.output_dict[request_id]
return result
await sleep(0.001)
class Payload(BaseModel):
text: str
@app.post('/upper')
async def upper(payload: Payload):
request_id = str(uuid.uuid4())
app.input_queue.put({'text': payload.text, 'request_id': request_id})
return await get_result(request_id)
if __name__ == "__main__":
uvicorn.run(app)
however it's not really running asynchronously; if I invoke the following test script:
from time import time
import requests
texts = [
'text1',
'text2',
'text3',
'text4'
]
time_start = time()
for text in texts:
result = requests.post('http://127.0.0.1:8000/upper', json={'text': text})
print(result.text, time() - time_start)
the messages do get processed, but the whole processing takes 15-20 seconds, the output being something like:
"TEXT1" 2.961090087890625
"TEXT2" 7.96642279624939
"TEXT3" 12.962305784225464
"TEXT4" 17.96261429786682
I was instead expecting the whole processing to take 5-10 seconds (after less than 5 seconds the first two messages should be processed, and the other two more or less exactly 5 seconds later). It seems instead that the second message is not being put to the queue until the first one is processed - i.e. the same as if I were just using a single thread.
Questions:
- Does anyone know how to modify the code above so that all the incoming messages are put to the queue immediately upon receiving them?
- [bonus question 1]: The above holds true if I run the script (say,
debug_app.py
) from the command line viauvicorn debug_app:app
. But if I run it withpython3 debug_app.py
no message is returned at all. Messages are received (doing CTRL+C results inWaiting for connections to close. (CTRL+C to force quit)
) but never processed. - [bonus question 2]: Another thing I don't understand is why, if I remove the line
await sleep(0.001)
inside the definition ofget_result
, the behaviour gets even worse: no matter what I do, the app freezes, I cannot terminate it (i.e. neither CTRL+C norkill
work), I have to send a sigkill (kill -9
) to stop it.
Background
If you are wondering why I am doing this, like in the blog post linked above, the purpose is to do efficient deep learning inference. The model I have takes (roughly) the same time processing one or a dozen requests at the same time, so batching can dramatically increase throughput. I first tried setting up a fastAPI frontend + RabbitMQ + Flask backend pipeline, and it worked, but the overhead of the complicated setup (and/or my inability of working with it) made the overhead heavier than the time it just took to compute the model, nullifying the gain... so I'm first trying to get a minimalistic version to work. The upper_messages
method in this toy example will become either directly invocation of the model (if this computational-heavier step is not blocking incoming connections too much) or an async call to another process actually doing the computations - I'll see about that later...