10

Imagine an asynchronous aiohttp web application that is supported by a Postgresql database connected via asyncpg and does no other I/O. How can I have a middle-layer hosting the application logic, that is not async? (I know I can simply make everything async -- but imagine my app to have massive application logic, only bound by database I/O, and I cannot touch everything of it).

Pseudo code:

async def handler(request):
    # call into layers over layers of application code, that simply emits SQL
    ...

def application_logic():
    ...
    # This doesn't work, obviously, as await is a syntax
    # error inside synchronous code.
    data = await asyncpg_conn.execute("SQL")
    ...
    # What I want is this:
    data = asyncpg_facade.execute("SQL")
    ...

How can a synchronous façade over asyncpg be built, that allows the application logic to make database calls? The recipes floating around like using async.run() or asyncio.run_coroutine_threadsafe() etc. do not work in this case, as we're coming from an already asynchronous context. I'd assume this cannot be impossible, as there already is an event loop that could in principle run the asyncpg coroutine.

Bonus question: what is the design rationale of making await inside sync a syntax error? Wouldn't it be pretty useful to allow await from any context that originated from a coroutine, so we'd have simple means to decompose an application in functional building blocks?

EDIT Extra bonus: beyond Paul's very good answer, that stays inside the "safe zone", I'd be interested in solutions that avoid blocking the main thread (leading to something more gevent-ish). See also my comment on Paul's answer ...

fpbhb
  • 1,469
  • 10
  • 22
  • 1
    Are you looking for an answer that doesn't stay inside the "safe zone"? You apparently already know of a couple of possible solutions, yet you asked this question anyway. I am left wondering what you want. Do you want a program that can perform a task switch without using the async/await syntax? That's not exactly what you asked, but that's what "not blocking the event loop" implies. Even if you could do that, you would have a situation where ANY function can potentially perform a task switch. Doesn't that have the drawbacks of multithreading without the benefits? – Paul Cornelius Nov 01 '21 at 21:05
  • @PaulCornelius I was looking for any answer when I asked, and since have also looked around myself and found the things I mentioned. Your answer was very helpful, since it wasn’t clear to me from the start that I’d need a separate loop in a thread. Now I try to understand what all the options are. – fpbhb Nov 02 '21 at 07:26
  • Difficult to see how putting an async facade on synchronous code will do anything other than block. If the migration path is to asyncio (a great decision for IO bound calls IMHO) then running the large synchronous code synchronously in one async function queueing tasks and awaiting responses handled by an async pg queue seems workable. I tried something like this https://stackoverflow.com/a/69175259/6242321 although you might use Semaphore(1) where I used (4). – jwal Nov 03 '21 at 09:19

1 Answers1

6

You need to create a secondary thread where you run your async code. You initialize the secondary thread with its own event loop, which runs forever. Execute each async function by calling run_coroutine_threadsafe(), and calling result() on the returned object. That's an instance of concurrent.futures.Future, and its result() method doesn't return until the coroutine's result is ready from the secondary thread.

Your main thread is then, in effect, calling each async function as if it were a sync function. The main thread doesn't proceed until each function call is finished. BTW it doesn't matter if your sync function is actually running in an event loop context or not.

The calls to result() will, of course, block the main thread's event loop. That can't be avoided if you want to get the effect of running an async function from sync code.

Needless to say, this is an ugly thing to do and it's suggestive of the wrong program structure. But you're trying to convert a legacy program, and it may help with that.

import asyncio
import threading
from datetime import datetime

def main():
    def thr(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=thr, args=(loop, ), daemon=True)
    t.start()

    print("Hello", datetime.now())
    t1 = asyncio.run_coroutine_threadsafe(f1(1.0), loop).result()
    t2 = asyncio.run_coroutine_threadsafe(f1(2.0), loop).result()
    print(t1, t2)
 

if __name__ == "__main__":
    main()

>>> Hello 2021-10-26 20:37:00.454577
>>> Hello 1.0 2021-10-26 20:37:01.464127
>>> Hello 2.0 2021-10-26 20:37:03.468691
>>> 1.0 2.0
Paul Cornelius
  • 9,245
  • 1
  • 15
  • 24
  • Thanks, this is a very good answer! Blocking the main thread is a severe restriction for those trying to gradually upgrade an existing codebase, though. It would be interesting to know what people think of the hacks floating around, that avoid blocking the main thread's event loop, e.g. https://github.com/oremanj/greenback, https://github.com/erdewit/nest_asyncio or zzzeek's solution for SQLAlchemy. I will put a bounty on this, hoping people contribute on this – fpbhb Oct 29 '21 at 13:19