-1

I'm trying to load data from a local Postgres database as quickly as possible, and it appears that the most performant python package is asyncpg. My code is synchronous, and I repeatedly need to load chunks of data. I'm not interested in having the async keyword propagate to every function I've written, so I'm trying to wrap the async code in a synchronous function.

The code below works, but is incredibly ugly:

def connect_to_postgres(user, password, database, host):
    async def wrapped():
        return await asyncpg.connect(user=keys['user'], password=keys['password'],
                                    database='markets', host='127.0.0.1')
    loop = asyncio.get_event_loop()    
    db_connection = loop.run_until_complete(wrapped())
    return db_connection
    
db_connection = connect_to_postgres(keys['user'], keys['password'],
                                    'db', '127.0.0.1')

def fetch_from_postgres(query, db_connection):
    async def wrapped():
        return await db_connection.fetch(query)
    loop = asyncio.get_event_loop()    
    values = loop.run_until_complete(wrapped())
    return values

fetch_from_postgres("SELECT * from db LIMIT 5", db_connection)

In Julia I would do something like

f() = @async 5
g() = fetch(f())
g()

But in Python it seems I have to do the rather clunky,

async def f():
  return 5
def g():
  loop = asyncio.get_event_loop()    
  return loop.run_until_complete(f())

Just wondering if there's a better way?

Edit: the latter python example can of course be written using

def fetch(x):
    loop = asyncio.get_event_loop()    
    return loop.run_until_complete(x)

Although, still need to create an async wrapped function unless I'm missing something.

Edit 2: I do care about performance, but wish to use a synchronous programing approach. asyncpg is 3x faster than psycopg2 as its core implementation is in Cython rather than Python, this is explained in more detail at https://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/. Hence my desire to wrap this asynchronous code.

Edit 3: another way of putting this question is what's the best way to avoid "what color is your function" in python?

tbenst
  • 804
  • 8
  • 16
  • See [`asyncio.run`](https://docs.python.org/3/library/asyncio-task.html#asyncio.run). – Ajax1234 Dec 05 '21 at 04:23
  • 3
    How did you come to the conclusion that `asyncpg` is the most performant? Mixing async and sync code is going to be a mess, why not make all your code async if you want to use the package or just use a sync package? – Iain Shelvington Dec 05 '21 at 04:28
  • @IainShelvington according to https://github.com/MagicStack/asyncpg#performance, asyncpg is 3x faster than (synchronous) psycopg2. I would indeed prefer a sync code package – tbenst Dec 05 '21 at 21:11
  • @Ajax1234 I believe that approach will have more overhead, as it will create a new event loop & destroy it on each call https://github.com/python/cpython/blob/19050711f5a68e50b942b3b7f1f4cf398f27efff/Lib/asyncio/runners.py#L39 – tbenst Dec 05 '21 at 22:12

1 Answers1

1

This is not difficult to do if you set up your program structure at the beginning. You create a second thread in which your async code will run, and start its event loop. When your main thread, which remains entirely synchronous, wants the result of async call (coroutine), you use the method asyncio.run_coroutine_threadsafe. That method returns a concurrent.futures.Future object. You obtain the returned value by calling its method result(), which blocks until the result is available.

It's almost as if you called the async method like a subroutine. There is minimal overhead because you created only one secondary thread. Here is a simple example:

import asyncio
import threading
from datetime import datetime

async def demo(t):
    await asyncio.sleep(t)
    print(f"Demo function {t} {datetime.now()}")
    return t

def main():
    def thr(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
        
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=thr, args=(loop, ), daemon=True)
    t.start()

    print("Main", datetime.now())
    t1 = asyncio.run_coroutine_threadsafe(demo(1.0), loop).result()
    t2 = asyncio.run_coroutine_threadsafe(demo(2.0), loop).result()
    print(t1, t2)

if __name__ == "__main__":
    main()

# >>> Main 2021-12-06 19:14:14.135206
# >>> Demo function 1.0 2021-12-06 19:14:15.146803
# >>> Demo function 2.0 2021-12-06 19:14:17.155898
# >>> 1.0 2.0

Your main program experiences a 1-second delay on the first invocation of demo(), and a 2-second delay on the second invocation. That's because your main thread does not have an event loop and therefore cannot execute the two delays in parallel. But that's exactly what you implied that you wanted, when you said that you wanted a synchronous program that uses a third-party async package.

This is a similar answer but the question is slightly different:

How can I have a synchronous facade over asyncpg APIs with Python asyncio?

Paul Cornelius
  • 9,245
  • 1
  • 15
  • 24