3

I need to return a response from my FastAPI path operation, but before this I want to send a slow request and I don't need to wait for result of that request, just log errors if there are any. Can I do this by means of Python and FastAPI? I would not like to add Celery to the project.

Here is what I have so far, but it runs synchronously:

import asyncio
import requests


async def slow_request(data):
    url = 'https://external.service'
    response = requests.post(
        url=url,
        json=data,
        headers={'Auth-Header': settings.API_TOKEN}
    )
    if not response.status_code == 200:
        logger.error('response:', response.status_code)
        logger.error('data', data)


@router.post('/order/')
async def handle_order(order: Order):
    json_data = {
        'order': order
    }
    
    task = asyncio.create_task(
        slow_request(json_data)
    )
    await task

    return {'body': {'message': 'success'}}
Vlad T.
  • 2,568
  • 3
  • 26
  • 40
  • 3
    If you want fire-and-forget task, just remove `await task` statement. `create_task` already scheduled the task. – alex_noname Jan 25 '21 at 18:06
  • @alex_noname It works, thank you! I was very close to solution =) If you add the answer I could accept it. – Vlad T. Jan 25 '21 at 18:14
  • 1
    You can also use [background task](https://fastapi.tiangolo.com/tutorial/background-tasks/?h=+back#using-backgroundtasks). – HTF Jan 25 '21 at 18:22
  • 2
    The `slow_request` function doesn't await anything, which means despite it being `async def`, it will block the entire event loop once it starts running. You should switch from requests to aiohttp which is designed to yield to the event loop whenever it waits for data to arrive. – user4815162342 Jan 25 '21 at 18:29
  • Related answers to this question can be found [here](https://stackoverflow.com/a/70873984/17865804), as well as [here](https://stackoverflow.com/a/71517830/17865804) and [here](https://stackoverflow.com/a/74508996/17865804). Additionally, please have a look at [this answer](https://stackoverflow.com/a/73736138/17865804) on how to properly make HTTP requests inside of a FastAPI application. – Chris Jun 16 '23 at 18:43

2 Answers2

0

OK, if nobody wants to post an answer here are the solutions:

Solution #1

We can just remove await task line as alex_noname suggested. It will work because create_task schedules task and we are no longer awaiting for its completion.

@router.post('/order/')
async def handle_order(order: Order):
    json_data = {
        'order': order
    }
    
    task = asyncio.create_task(
        slow_request(json_data)
    )

    return {'body': {'message': 'success'}}

Solution #2

I ended up with BackgroundTasks as HTF suggested as I'm already using FastAPI anyway, and this solution seems more neat to me.

@router.post('/order/')
async def handle_order(order: Order, background_tasks: BackgroundTasks):
    json_data = {
        'order': order
    }
    
    background_tasks.add_task(slow_request, json_data)

    return {'body': {'message': 'success'}}

This even works without async before def slow_request(data):

Vlad T.
  • 2,568
  • 3
  • 26
  • 40
  • Solution #1 is not async at all and _will_ block the whole event loop. It shouldn't be part of an answer because, even if it happened to work for you, it will not help future visitors of the site. – user4815162342 Jan 26 '21 at 12:04
  • @user4815162342 Can you prove your point? Is it true just because YOU said this? If yes than I cannot accept your argument. – Vlad T. Jan 27 '21 at 13:59
  • As I mentioned in a comment to the question, the problem is that `slow_request` doesn't await anything. Python async functions are based on cooperative multitasking, and an async function without `await` simply doesn't cooperate and will, if it blocks, block the whol eevent loop. I didn't mean to come off as aggressive or overbearing, just pointing out a technical issue which I already pointed out previously, but which still made it to the answer. It's your answer, so it's up to you to edit it or not, but I wanted to warn future visitors who come across it. – user4815162342 Jan 27 '21 at 14:05
  • @user4815162342 Well, these general principles do not contradict to the solution. You might set up an experiment and see that it does not block an event loop. You could also investigate asyncio lib and see that it can concurrently run even blocking function. – Vlad T. Jan 27 '21 at 16:24
  • The answer doesn't show any of that, though. Solution #1 is is misleading as currently phrased because it makes it look like you can run a blocking function in the background just by adding `async` in front of the `def` and calling `asyncio.create_task()` on it. That is unfortunately not the case. That kind of code is also misleading because it might _look_ like it's working correctly if you test it with just one task, but it actually fails in a scenario with multiple tasks, which you might not notice until much later. – user4815162342 Jan 27 '21 at 16:30
  • @user4815162342 Would you mind to provide an example that explicitly shows how it fails? – Vlad T. Jan 27 '21 at 17:00
  • You already provided an example in the question - the async function uses a blocking call to `requests`, and It is even named `slow_request` to stress that it's slow. During the execution of this function, the event loop will be completely unresponsive. The correct way is to use aiohttp, or pass `requests.post` to `run_in_executor`. With that change, Solution #1 would be correct. – user4815162342 Jan 28 '21 at 11:03
0

The problem is really two-part

  • the requests library is synchronous, so requests.post(...) will block the event loop until completed
  • you don't need the result of the web request to respond to the client, but your current handler cannot respond to the client until the request is completed (even if it was async)

Consider separating the request logic off into another process, so it can happen at its own speed.

The key being that you can put work into a queue of some kind to complete eventually, without directly needing the result for response to the client.

You could use an async http request library and some collection of callbacks, multiprocessing to spawn a new process(es), or something more exotic like an independent program (perhaps with a pipe or sockets to communicate).

Maybe something of this form will work for you

import base64
import json
import multiprocessing

URL_EXTERNAL_SERVICE = "https://example.com"
TIMEOUT_REQUESTS = (2, 10)  # always set a timeout for requests
SHARED_QUEUE = multiprocessing.Queue()  # may leak as unbounded

async def slow_request(data):
    SHARED_QUEUE.put(data)
    # now returns on successful queue put, rather than request completion

def requesting_loop(logger, Q, url, token):
    while True:  # expects to be a daemon
        data = json.dumps(Q.get())  # block until retrieval (non-daemon can use sentinel here)
        response = requests.post(
            url,
            json=data,
            headers={'Auth-Header': token},
            timeout=TIMEOUT_REQUESTS,
        )
        # raise_for_status() --> try/except --> log + continue
        if response.status_code != 200:
            logger.error('call to {} failed (code={}) with data: {}'.format(
                url, response.status_code,
                "base64:" + base64.b64encode(data.encode())
            ))

def startup():  # run me when starting
    # do whatever is needed for logger
    # create a pool instead if you may need to process a lot of requests
    p = multiprocessing.Process(
        target=requesting_loop,
        kwargs={"logger": logger, "Q": SHARED_QUEUE, "url": URL_EXTERNAL_SERVICE, "token": settings.API_TOKEN},
        daemon=True
    )
    p.start()
ti7
  • 16,375
  • 6
  • 40
  • 68