66

I want to execute an async function every time the Flask route is executed. Why is the abar function never executed?

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=loop)
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    loop.run_forever()

I also tried putting the blocking call in a separate thread. But it is still not calling the abar function.

import asyncio
from threading import Thread
from flask import Flask

async def abar(a):
    print(a)

app = Flask(__name__)

def start_worker(loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    finally:
        loop.close()

worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)
    return "OK"

if __name__ == "__main__":
    worker.start()
    app.run(debug=False, use_reloader=False)
davidism
  • 121,510
  • 29
  • 395
  • 339
user24502
  • 1,662
  • 4
  • 16
  • 21
  • 2
    `app.run` and `loop.run_forever` are both blocking. You’re probably better off using a thread. If you _need_ to use asyncio, you should look into one of the Flask-like frameworks built on top of it. – dirn Dec 16 '17 at 06:08
  • @dim Thank you very much. I tried to move one blocking into a seperate thread. S. my edited question! – user24502 Dec 16 '17 at 11:20

5 Answers5

51

You can incorporate some async functionality into Flask apps without having to completely convert them to asyncio.

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    loop.run_until_complete(abar("abar"))
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)

This will block the Flask response until the async function returns, but it still allows you to do some clever things. I've used this pattern to perform many external requests in parallel using aiohttp, and then when they are complete, I'm back into traditional flask for data processing and template rendering.

import aiohttp
import asyncio
import async_timeout
from flask import Flask

loop = asyncio.get_event_loop()
app = Flask(__name__)

async def fetch(url):
    async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
        async with session.get(url) as response:
            return await response.text()

def fight(responses):
    return "Why can't we all just get along?"

@app.route("/")
def index():
    # perform multiple async requests concurrently
    responses = loop.run_until_complete(asyncio.gather(
        fetch("https://google.com/"),
        fetch("https://bing.com/"),
        fetch("https://duckduckgo.com"),
        fetch("http://www.dogpile.com"),
    ))

    # do something with the results
    return fight(responses)

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
Travis Terry
  • 863
  • 6
  • 5
  • 3
    As it is typical to run Flask in production with an async worker such as gevent, meinheld or eventlet, I think it is important to note that this solution would block the gevent/meinheld/eventlet event-loop. Which in turn would negate some the advantage of using them. – pgjones Jan 09 '19 at 19:34
  • 1
    What happens when the aiohttp wsgi worker is used ? https://aiohttp-wsgi.readthedocs.io/en/stable/index.html. Will the event loop in the worker be blocked even then? – Arvind Sridharan Apr 04 '19 at 12:59
  • 16
    Your example gives me `RuntimeError: There is no current event loop in thread 'Thread-1'.`. Repro: 1) I've saved your snippet to soexamp.py; 2) ran `python soexamp.py`; 3) than ` curl localhost:5000/`. My flask.__version__ is '1.0.2' and aiohttp.__version__ is '3.5.4'. – Anton Daneyko Apr 12 '19 at 15:27
  • 10
    This is **not** thread safe, you can't simply use `loop.run_until_complete()` from arbitrary threads. An asyncio loop is *thread specific*. Any real-life WSGI deployment will be using threads. Instead of calling `asyncio.get_event_loop()` you'd have to create a new event loop *per thread*. That's... overkill however. – Martijn Pieters Oct 29 '19 at 18:32
  • 1
    @MartijnPieters what is the problem creating new event loop per thread? And can you please explain what is meant by not thread safe? – ravi malhotra Mar 15 '20 at 12:12
  • 3
    @ravimalhotra: not thread safe means that things can break because multiple threads are altering the same data structures unless you take threading into account. The asyncio event loop implementation is not thread safe apart from a few [explicitly documented functions](https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading). The code here *doesn't* create a new event loop per thread, nor does it pass coroutines to the single thread correctly. Note that I also posted an answer to this question that addresses these issues better. – Martijn Pieters Mar 15 '20 at 16:06
  • thnak you @travis terry – Sujoy Jun 26 '21 at 14:00
  • why do we need `use_reloader `? – alper Aug 16 '21 at 03:11
36

A simpler solution to your problem (in my biased view) is to switch to Quart from Flask. If so your snippet simplifies to,

import asyncio
from quart import Quart

async def abar(a):
    print(a)

app = Quart(__name__)

@app.route("/")
async def notify():
    await abar("abar")
    return "OK"

if __name__ == "__main__":
    app.run(debug=False)

As noted in the other answers the Flask app run is blocking, and does not interact with an asyncio loop. Quart on the other hand is the Flask API built on asyncio, so it should work how you expect.

Also as an update, Flask-Aiohttp is no longer maintained.

pgjones
  • 6,044
  • 1
  • 14
  • 12
  • I have a couple of libraries which have synchronous/blocking functions What would happen then when I switch to quart? When I call functions in those libraries it would block the event loop right? – Arvind Sridharan Apr 04 '19 at 13:04
  • Yes, they will block. You can wrap the calls to these functions with `asyncio.run_in_executor` and await that (by default runs the functions in another thread). Alternatively you can switch to a asyncio based alternative library. – pgjones Apr 06 '19 at 08:21
  • 1
    I am sorry for the downvote but answers telling you to switch the whole framework when you want to be able to trigger background task are not really helpful – Eric Burel Jul 16 '20 at 15:29
  • Quart is a nice suggestion, but your answer doesn't actually address the question properly, because you `await` the call which the OP wants to happen asynchronously, independent of the server response. – deed02392 Nov 13 '20 at 11:37
24

Your mistake is to try to run the asyncio event loop after calling app.run(). The latter doesn't return, it instead runs the Flask development server.

In fact, that's how most WSGI setups will work; either the main thread is going to busy dispatching requests, or the Flask server is imported as a module in a WSGI server, and you can't start an event loop here either.

You'll instead have to run your asyncio event loop in a separate thread, then run your coroutines in that separate thread via asyncio.run_coroutine_threadsafe(). See the Coroutines and Multithreading section in the documentation for what this entails.

Here is an implementation of a module that will run such an event loop thread, and gives you the utilities to schedule coroutines to be run in that loop:

import asyncio
import itertools
import threading

__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

class EventLoopThread(threading.Thread):
    loop = None
    _count = itertools.count(0)

    def __init__(self):
        self.started = threading.Event()
        name = f"{type(self).__name__}-{next(self._count)}"
        super().__init__(name=name, daemon=True)

    def __repr__(self):
        loop, r, c, d = self.loop, False, True, False
        if loop is not None:
            r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
        return (
            f"<{type(self).__name__} {self.name} id={self.ident} "
            f"running={r} closed={c} debug={d}>"
        )

    def run(self):
        self.loop = loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.call_later(0, self.started.set)

        try:
            loop.run_forever()
        finally:
            try:
                shutdown_asyncgens = loop.shutdown_asyncgens()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_asyncgens)
            try:
                shutdown_executor = loop.shutdown_default_executor()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_executor)
            asyncio.set_event_loop(None)
            loop.close()

    def stop(self):
        loop, self.loop = self.loop, None
        if loop is None:
            return
        loop.call_soon_threadsafe(loop.stop)
        self.join()

_lock = threading.Lock()
_loop_thread = None

def get_event_loop():
    global _loop_thread

    if _loop_thread is None:
        with _lock:
            if _loop_thread is None:
                _loop_thread = EventLoopThread()
                _loop_thread.start()
                # give the thread up to a second to produce a loop
                _loop_thread.started.wait(1)

    return _loop_thread.loop

def stop_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is not None:
            _loop_thread.stop()
            _loop_thread = None

def run_coroutine(coro):
    """Run the coroutine in the event loop running in a separate thread

    Returns a Future, call Future.result() to get the output

    """
    return asyncio.run_coroutine_threadsafe(coro, get_event_loop())

You can use the run_coroutine() function defined here to schedule asyncio routines. Use the returned Future instance to control the coroutine:

  • Get the result with Future.result(). You can give this a timeout; if no result is produced within the timeout, the coroutine is automatically cancelled.
  • You can query the state of the coroutine with the .cancelled(), .running() and .done() methods.
  • You can add callbacks to the future, which will be called when the coroutine has completed, or is cancelled or raised an exception (take into account that this is probably going to be called from the event loop thread, not the thread that you called run_coroutine() in).

For your specific example, where abar() doesn't return any result, you can just ignore the returned future, like this:

@app.route("/")
def notify():
    run_coroutine(abar("abar"))
    return "OK"

Note that before Python 3.8 that you can't use an event loop running on a separate thread to create subprocesses with! See my answer to Python3 Flask asyncio subprocess in route hangs for backport of the Python 3.8 ThreadedChildWatcher class for a work-around for this.

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • Assume we are making recursive async calls inside the `abar()` function. If `abar()` calls another `async` function, ex: `async def abar_1`, should we make the call `run_coroutine(abar_1())` or `await abar_1()`. And would it be same if `abart_1()` calls another async function and so on? I have a library which as await func() definitions, as I understand I have to convert all into `run_coroutine(func())` format in order them to work along with your code. Could there be a wrapper() function for them? – alper Jul 12 '21 at 02:00
  • 1
    @alper you are not talking about recursion here, just normal asynchronous calls. Normally you would just `await` on other coroutines or create a task object to run the other coroutine concurrently. See [*Coroutines and Tasks*](https://docs.python.org/3/library/asyncio-task.html). The code in my answer is only there to integrate asyncio with Flask, once inside the event loop *use asynchronous programming techniques*. – Martijn Pieters Jul 12 '21 at 06:49
6

For same reason you won't see this print:

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    print('Hey!')
    loop.run_forever()

loop.run_forever() is never called since as @dirn already noted app.run is also blocking.

Running global blocking event loop - is only way you can run asyncio coroutines and tasks, but it's not compatible with running blocking Flask app (or with any other such thing in general).

If you want to use asynchronous web framework you should choose one created to be asynchronous. For example, probably most popular now is aiohttp:

from aiohttp import web


async def hello(request):
    return web.Response(text="Hello, world")


if __name__ == "__main__":
    app = web.Application()
    app.router.add_get('/', hello)
    web.run_app(app)  # this runs asyncio event loop inside

Upd:

About your try to run event loop in background thread. I didn't investigate much, but it seems problem somehow related with tread-safety: many asyncio objects are not thread-safe. If you change your code this way, it'll work:

def _create_task():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)

@app.route("/")
def notify():
    worker_loop.call_soon_threadsafe(_create_task)
    return "OK"

But again, this is very bad idea. It's not only very inconvenient, but I guess wouldn't make much sense: if you're going to use thread to start asyncio, why don't just use threads in Flask instead of asyncio? You will have Flask you want and parallelization.

If I still didn't convince you, at least take a look at Flask-aiohttp project. It has close to Flask api and I think still better that what you're trying to do.

Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • Thank you very much for your explanation. That makes sense. Also its a nice small aiohttp example. Unfortunately I am binded to flask/flask-ask for an alexa skill. I have modified my original question and moved one blocking call in a seperate thread. But still no luck – user24502 Dec 16 '17 at 11:27
  • Running an asyncio loop with Flask is an excellent idea and not problematic at all, provided you take a bit of care. Threading and asynchronous coroutines have very different pros and cons, when doing a lot of blocking I/O asyncio is preferable over threads. – Martijn Pieters Jul 12 '21 at 06:53
4

The main issue, as already explained in the other answers by @Martijn Pieters and @Mikhail Gerasimov is that app.run is blocking, so the line loop.run_forever() is never called. You will need to manually set up and maintain a run loop on a separate thread.

Fortunately, with Flask 2.0, you don't need to create, run, and manage your own event loop anymore. You can define your route as async def and directly await on coroutines from your route functions.

https://flask.palletsprojects.com/en/2.0.x/async-await/

Using async and await

New in version 2.0.

Routes, error handlers, before request, after request, and teardown functions can all be coroutine functions if Flask is installed with the async extra (pip install flask[async]). It requires Python 3.7+ where contextvars.ContextVar is available. This allows views to be defined with async def and use await.

Flask will take care of creating the event loop on each request. All you have to do is define your coroutines and await on them to finish:

https://flask.palletsprojects.com/en/2.0.x/async-await/#performance

Performance

Async functions require an event loop to run. Flask, as a WSGI application, uses one worker to handle one request/response cycle. When a request comes into an async view, Flask will start an event loop in a thread, run the view function there, then return the result.

Each request still ties up one worker, even for async views. The upside is that you can run async code within a view, for example to make multiple concurrent database queries, HTTP requests to an external API, etc. However, the number of requests your application can handle at one time will remain the same.

Tweaking the original example from the question:

import asyncio
from flask import Flask, jsonify

async def send_notif(x: int):
    print(f"Called coro with {x}")
    await asyncio.sleep(1)
    return {"x": x}

app = Flask(__name__)

@app.route("/")
async def notify():
    futures = [send_notif(x) for x in range(5)]
    results = await asyncio.gather(*futures)

    response = list(results)
    return jsonify(response)

# The recommended way now is to use `flask run`.
# See: https://flask.palletsprojects.com/en/2.0.x/cli/
# if __name__ == "__main__":
#     app.run(debug=False, use_reloader=False)
$ time curl -s -XGET 'http://localhost:5000'
[{"x":0},{"x":1},{"x":2},{"x":3},{"x":4}]


real    0m1.016s
user    0m0.005s
sys     0m0.006s

Most common recipes using asyncio can be applied the same way. The one thing to take note of is, as of Flask 2.0.1, we cannot use asyncio.create_task to spawn background tasks:

https://flask.palletsprojects.com/en/2.0.x/async-await/#background-tasks

Async functions will run in an event loop until they complete, at which stage the event loop will stop. This means any additional spawned tasks that haven’t completed when the async function completes will be cancelled. Therefore you cannot spawn background tasks, for example via asyncio.create_task.

If you wish to use background tasks it is best to use a task queue to trigger background work, rather than spawn tasks in a view function.

Other than the limitation with create_task, it should work for use-cases where you want to make async database queries or multiple calls to external APIs.

Gino Mempin
  • 25,369
  • 29
  • 96
  • 135