1

I am trying to set up a fastAPI app doing the following:

  • Accept messages as post requests and put them in a queue;
  • A background job is, from time to time, pulling messages (up to a certain batch size) from the queue, processing them in a batch, and storing results in a dictionary;
  • The app is retrieving results from the dictionary and sending them back "as soon as" they are done.

To do so, I've set up a background job with apscheduler communicating via a queue trying to make a simplified version of this post: https://levelup.gitconnected.com/fastapi-how-to-process-incoming-requests-in-batches-b384a1406ec. Here is the code of my app:

import queue
import uuid
from asyncio import sleep

import uvicorn
from pydantic import BaseModel
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler

app = FastAPI()
app.input_queue = queue.Queue()
app.output_dict = {}
app.queue_limit = 2


def upper_messages():
    for i in range(app.queue_limit):
        try:
            obj = app.input_queue.get_nowait()
            app.output_dict[obj['request_id']] = obj['text'].upper()
        except queue.Empty:
            pass


app.scheduler = AsyncIOScheduler()
app.scheduler.add_job(upper_messages, 'interval', seconds=5)
app.scheduler.start()


async def get_result(request_id):
    while True:
        if request_id in app.output_dict:
            result = app.output_dict[request_id]
            del app.output_dict[request_id]
            return result
        await sleep(0.001)


class Payload(BaseModel):
    text: str


@app.post('/upper')
async def upper(payload: Payload):
    request_id = str(uuid.uuid4())
    app.input_queue.put({'text': payload.text, 'request_id': request_id})
    return await get_result(request_id)


if __name__ == "__main__":
    uvicorn.run(app)

however it's not really running asynchronously; if I invoke the following test script:

from time import time
import requests

texts = [
    'text1',
    'text2',
    'text3',
    'text4'
]

time_start = time()
for text in texts:
    result = requests.post('http://127.0.0.1:8000/upper', json={'text': text})
    print(result.text, time() - time_start)

the messages do get processed, but the whole processing takes 15-20 seconds, the output being something like:

"TEXT1" 2.961090087890625
"TEXT2" 7.96642279624939
"TEXT3" 12.962305784225464
"TEXT4" 17.96261429786682

I was instead expecting the whole processing to take 5-10 seconds (after less than 5 seconds the first two messages should be processed, and the other two more or less exactly 5 seconds later). It seems instead that the second message is not being put to the queue until the first one is processed - i.e. the same as if I were just using a single thread.

Questions:

  • Does anyone know how to modify the code above so that all the incoming messages are put to the queue immediately upon receiving them?
  • [bonus question 1]: The above holds true if I run the script (say, debug_app.py) from the command line via uvicorn debug_app:app. But if I run it with python3 debug_app.py no message is returned at all. Messages are received (doing CTRL+C results in Waiting for connections to close. (CTRL+C to force quit)) but never processed.
  • [bonus question 2]: Another thing I don't understand is why, if I remove the line await sleep(0.001) inside the definition of get_result, the behaviour gets even worse: no matter what I do, the app freezes, I cannot terminate it (i.e. neither CTRL+C nor kill work), I have to send a sigkill (kill -9) to stop it.

Background If you are wondering why I am doing this, like in the blog post linked above, the purpose is to do efficient deep learning inference. The model I have takes (roughly) the same time processing one or a dozen requests at the same time, so batching can dramatically increase throughput. I first tried setting up a fastAPI frontend + RabbitMQ + Flask backend pipeline, and it worked, but the overhead of the complicated setup (and/or my inability of working with it) made the overhead heavier than the time it just took to compute the model, nullifying the gain... so I'm first trying to get a minimalistic version to work. The upper_messages method in this toy example will become either directly invocation of the model (if this computational-heavier step is not blocking incoming connections too much) or an async call to another process actually doing the computations - I'll see about that later...

Marco Spinaci
  • 1,750
  • 15
  • 22
  • Thanks for the links, your other answer provided a lot of insights! I am indeed a noob in async code, but from your answer I understood that I was not doing things wrongly - my code falls in the "as long as there is no await call to I/O-bound operations inside such routes" part of your answer. The reason why I wasn't getting batches was indeed much more stupid (and came to my mind reading your sentence "please remember to do that from a tab that is isolated from the browser's main session") as I wrote in my own answer below... – Marco Spinaci Apr 15 '22 at 20:51
  • (also, sorry for going slightly off-topic, but I don't know C# and I don't fully understand the first link you posted; it seems to me that there people are discussing "threads" meaning "working in parallel on different cores", but due to GIL in Python, that would actually mean processes, not threads, right? Does C# allow for real parallelism even for multiple threads in the same process? In Python due to GIL having multiple "threads" would only be relevant in things such as dealing with multiple incoming connections with lots of idle time, which indeed is the situation in this question) – Marco Spinaci Apr 15 '22 at 20:54
  • Please have a look at [this answer](https://stackoverflow.com/a/71517830/17865804), which will help you understand the concept of `async`/`await` and the difference between using `def` and `async def` in FastAPI. – Chris Apr 15 '22 at 21:17

1 Answers1

1

... after looking better into it, it looks like the application was indeed working as I wanted it to, my error was in the way I tested it...

Indeed, when sending a POST request to the uvicorn server, the client is left waiting for an answer to come - which is intended behaviour. Of course, this also means, however, is that the next request is not sent until the first answer is collected. So the server is not batching them because there's nothing to batch!

To test this correctly, I slightly altered the test.py script to:

from time import time
import requests
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('prefix')
args = parser.parse_args()

texts = [
    'text1',
    'text2',
    'text3',
    'text4'
]
texts = [args.prefix + '_' + t for t in texts]

time_start = time()
for text in texts:
    result = requests.post('http://127.0.0.1:8000/upper', json={'text': text})
    print(result.text, time() - time_start)

And run it in multiple processes via:

python3 test.py user1 & python3 test.py user2 & python3 test.py user3

The output is now as expected, with pairs of messages (from different users!) being processed in a batch (and the exact order is a bit randomized, although the same user gets, of course, answers in the order of the requests it made):

"USER1_TEXT1" 4.340522766113281
"USER3_TEXT1" 4.340718030929565
"USER2_TEXT1" 9.334393978118896
"USER1_TEXT2" 9.340892553329468
"USER3_TEXT2" 14.33926010131836
"USER2_TEXT2" 14.334421396255493
"USER1_TEXT3" 19.339791774749756
"USER3_TEXT3" 19.33999013900757
"USER1_TEXT4" 24.33989715576172
"USER2_TEXT3" 24.334784030914307
"USER3_TEXT4" 29.338693857192993
"USER2_TEXT4" 29.333901166915894

I'm leaving the question open (and not accepting my own answer) because for the "bonus questions" above (about the application becoming frozen) I still don't have an answer.

Marco Spinaci
  • 1,750
  • 15
  • 22