7

I have the following simplified code:

async def asynchronous_function(*args, **kwds):
    statement = await prepare(query)
    async with conn.transaction():
        async for record in statement.cursor():
            ??? yield record ???

...

class Foo:

    def __iter__(self):
        records = ??? asynchronous_function ???
        yield from records

...

x = Foo()
for record in x:
    ...

I don't know how to fill in the ??? above. I want to yield the record data, but it's really not obvious how to wrap asyncio code.

Brian Bruggeman
  • 5,008
  • 2
  • 36
  • 55
  • It's usually a bad idea to mix asynchronous code and blocking code, would it be ok to replace `for record in x` with `async for record in x`? – Vincent Mar 13 '19 at 00:49
  • The problem is that once I have async, I have to push it all the way up the stack - I don't want to rewrite all of my stack to conform to async's style. Or said differently, I have this code working without async, but I want to try out the async code to see if it is any more performant. All of the examples I see are really toy examples... – Brian Bruggeman Mar 13 '19 at 01:07
  • 1
    Well, asyncio generally doesn't provide performance. It does provide cooperative multitasking though, but you usually have to use the async/await paradigm for every blocking calls in order to see the benefits. And in this context, `for record in x:` is indeed a blocking call. – Vincent Mar 13 '19 at 10:49
  • Asyncio is more about scalability than performance. Talking to 50 peers is perfectly possible using threads; talking to 500 or 5000 will be a problem because you will either have to spawn a huge number of OS threads (and debug contention issues between them, especially combined with the GIL), or use thread pools and spend unproductive time waiting for a free slot in the pool. Asyncio allows you to handle many connections at once without requiring an OS thread per connection, while retaining readable code with coroutines. See my answer for an example of using asyncio in a non-asyncio program. – user4815162342 Mar 14 '19 at 15:23
  • Maybe to piggyback here on comments. I feel like nomenclature here is getting the better of us. When I mention performance, I really am thinking of the critical aspect of concurrency where I am spending clock cycles idle waiting on I/O. I have work that is independent and can definitely use those clock cycles if I have a way of freeing them up. I think asyncio can do exactly that, but it is incredibly clunky when interfacing with current synchronous code. – Brian Bruggeman Mar 14 '19 at 15:53

2 Answers2

9

While it is true that asyncio is intended to be used across the board, sometimes it is simply impossible to immediately convert a large piece of software (with all its dependencies) to async. Fortunately there are ways to combine legacy synchronous code with newly written asyncio portions. A straightforward way to do so is by running the event loop in a dedicated thread, and using asyncio.run_coroutine_threadsafe to submit tasks to it.

With those low-level tools you can write a generic adapter to turn any asynchronous iterator into a synchronous one. For example:

import asyncio, threading, queue

# create an asyncio loop that runs in the background to
# serve our asyncio needs
loop = asyncio.get_event_loop()
threading.Thread(target=loop.run_forever, daemon=True).start()

def wrap_async_iter(ait):
    """Wrap an asynchronous iterator into a synchronous one"""
    q = queue.Queue()
    _END = object()

    def yield_queue_items():
        while True:
            next_item = q.get()
            if next_item is _END:
                break
            yield next_item
        # After observing _END we know the aiter_to_queue coroutine has
        # completed.  Invoke result() for side effect - if an exception
        # was raised by the async iterator, it will be propagated here.
        async_result.result()

    async def aiter_to_queue():
        try:
            async for item in ait:
                q.put(item)
        finally:
            q.put(_END)

    async_result = asyncio.run_coroutine_threadsafe(aiter_to_queue(), loop)
    return yield_queue_items()

Then your code just needs to call wrap_async_iter to wrap an async iter into a sync one:

async def mock_records():
    for i in range(3):
        yield i
        await asyncio.sleep(1)

for record in wrap_async_iter(mock_records()):
    print(record)

In your case Foo.__iter__ would use yield from wrap_async_iter(asynchronous_function(...)).

user4815162342
  • 141,790
  • 18
  • 296
  • 355
1

If you want to receive all records from async generator, you can use async for or, for shortness, asynchronous comprehensions:

async def asynchronous_function(*args, **kwds):
    # ...
    yield record


async def aget_records():
    records = [
        record 
        async for record 
        in asynchronous_function()
    ]
    return records

If you want to get result from asynchronous function synchronously (i.e. blocking), you can just run this function in asyncio loop:

def get_records():
    records = asyncio.run(aget_records())
    return records

Note, however, that once you run some coroutine in event loop you're losing ability to run this coroutine concurrently (i.e. parallel) with other coroutines and thus receive all related benefits.

As Vincent already pointed in comments, asyncio is not a magic wand that makes code faster, it's an instrument that sometimes can be used to run different I/O tasks concurrently with low overhead.

You may be interested in reading this answer to see main idea behind asyncio.

Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • 2
    The code you've written here collects all of the data from asyncio and blocks until asyncio's run is completed. I must not have described my use case well, because that's really no better than simply running standard blocking, non-asyncio code. – Brian Bruggeman Mar 13 '19 at 17:48
  • @BrianBruggeman yes, it's no better. I'm not sure what you want to do then: it's impossible to propagate data from async generator to plain sync `for` loop without acquiring all values from `async for` loop first. You may try to think about it following way: "Why `statement.cursor()` works with `async for` and doesn't work with `for` in the first place?" – Mikhail Gerasimov Mar 13 '19 at 18:03
  • " it's impossible to propagate data from async generator to plain sync for loop without acquiring all values from async for loop first. " I think this is what I wanted to understand. If this is really the case, then I will likely never use asyncio unless I completely rewrite my code bases or start with asyncio from scratch. – Brian Bruggeman Mar 13 '19 at 18:10