0

I have this code:

async def foo(x):
    yield x
    yield x + 1

async def intermediary(y):
    await foo(y)

def bar():
    c = intermediary(5)

What do I put in bar to get the 5 and the 6 out of c?

I'm asking because the asyncio library seems like a lot of magic. And I want to know exactly how the magic works.

Maybe I want to write my own functions that call read or write and then inform some top level loop that I wrote that they're waiting for the file descriptor to become readable or writeable.

And then, maybe I want that top level loop to be able to resume my read and write functions (and the whole intermediate chain between the top level loop and them) once those conditions become true.

I already know how to use asyncio more or less. I wrote this little demo program that computes squares after a delay but launches lots of those tasks that each append to a list after a random interval. It's kind of clumsily written, but it works.

I want to know exactly what that program is doing under the hood. And in order to do that, I have to know how await on that sleep informs the top-level event loop that it wants to sleep (and be called again) for a bit and how the state of all the intermediate stack frames between the call to sleep and the top level event loop are frozen in place then reactivated when the delay is over.

Omnifarious
  • 54,333
  • 19
  • 131
  • 194
  • asyncio does not make sense without I/O. Please change your program to preform a bit of slow=blocking I/O ops - for example get 5 and 6 from a remote server. – Udi Mar 21 '17 at 04:53
  • For more information on asyncio see https://docs.python.org/3/library/asyncio-task.html – Penguin Brian Mar 21 '17 at 05:04
  • @PenguinBrian - Yeah, I've read those and more or less understand them. I've even written a working demo program that uses it. But I want to know how I would write my own `asyncio` library. Not necessarily because I'm going to, just because I want to know how it works. I've updated my question to reflect that. Thank you very much for trying to answer. – Omnifarious Mar 21 '17 at 05:28
  • This video basically answered my question: https://www.youtube.com/watch?v=E-1Y4kSsAFc&index=9&list=PLaYLZDmkh3uz8nEiL3DziTBes5LDGqROx – Omnifarious Mar 21 '17 at 08:49
  • And this talk is even better: https://www.youtube.com/watch?v=ZzfHjytDceU&index=8&list=PLaYLZDmkh3uz8nEiL3DziTBes5LDGqROx – Omnifarious Mar 21 '17 at 09:29
  • ...which includes this great slide: https://i.stack.imgur.com/0b090.jpg – Udi Mar 22 '17 at 07:50

4 Answers4

3

Have you tried looking at the source for asyncio.sleep?

@coroutine                                                                       
def sleep(delay, result=None, *, loop=None):                                     
    """Coroutine that completes after a given time (in seconds)."""              
    if delay == 0:                                                               
        yield                                                                    
        return result                                                            

    if loop is None:                                                             
        loop = events.get_event_loop()                                           
    future = loop.create_future()                                                
    h = future._loop.call_later(delay,                                           
                                futures._set_result_unless_cancelled,            
                                future, result)                                  
    try:                                                                         
        return (yield from future)                                               
    finally:                                                                     
        h.cancel()

Basically it uses loop.call_later to set a future, and then waits for the future. Not sure this entirely answers your questions, but it might help.

Penguin Brian
  • 1,991
  • 14
  • 25
  • 1
    It's a step in the right direction. Now, how is the future implemented? :-) Somewhere that chain of `await` calls ends in something. – Omnifarious Mar 21 '17 at 05:49
  • @Omnifarious The chain of `await` actually ends with [the future itself](https://github.com/python/asyncio/blob/master/asyncio/futures.py#L380). It bubbles up to [Task._step](https://github.com/python/asyncio/blob/master/asyncio/tasks.py#L239) where the next step is [scheduled as a callback](https://github.com/python/asyncio/blob/master/asyncio/tasks.py#L270) of the given future. – Vincent Mar 21 '17 at 09:07
  • @Vincent - Interesting. Is there a function in there with the `@coroutine` decorator that yields an instruction to the event loop about when to call the future back? – Omnifarious Mar 21 '17 at 09:18
  • @Omnifarious What you describe is actually how [curio traps](https://github.com/dabeaz/curio/blob/master/curio/traps.py) work. Asyncio has a different approach, entirely based on future. – Vincent Mar 21 '17 at 09:32
  • @Vincent - Interesting. As a model for understanding how coroutines interact with the old generator framework, curio traps make a lot more sense to me. But I am curious as to how `asyncio` works as well. I learned that from David Beazly's talks, and so it's unsurprising that he would talk about how curio worked. – Omnifarious Mar 21 '17 at 09:36
  • I answered my own question. http://stackoverflow.com/a/42938406/167958 . @Vincent - Thank you for your help as well. – Omnifarious Mar 21 '17 at 21:28
  • Actually, now that I understand what's going on, I should've noticed the `yield` up at the top if the delay is 0. – Omnifarious Mar 23 '17 at 14:58
2

So, I understand a lot better how to make what I was trying to do work. This is how my code should've read:

import types

@types.coroutine
def foo(x):
    yield x
    yield x + 1

async def intermediary(y):
    await foo(y)

def bar():
    c = intermediary(5)
    try:
        while True:
            result = c.send(None)
            print(f"Got {result} from the coroutine.")
    except StopIteration as e:
        print(f"StopIteration exception: {e!r}")

The basic answer is that the endpoint of this can be a normal generator decorated with types.coroutine. There are more ways of making this work, and this further modification of my code demonstrates them:

import types
from collections.abc import Awaitable

@types.coroutine
def foo(x):
    sent = yield x
    print(f"foo was sent {sent!r}.")
    sent = yield x + 1
    print(f"foo was sent {sent!r}.")
    return 'generator'

class MyAwaitable(Awaitable):
    def __init__(self, x):
        super().__init__()
        self.x_ = x
    def __await__(self):
        def gen(x):
            for i in range(x-1, x+2):
                sent = yield i
                print(f"MyAwaitable was sent {sent!r}.")
            return 'class'
        return iter(gen(self.x_))

async def intermediary(t, y):
    awaited = await t(y)
    print(f"Got {awaited!r} as value from await.")

def runco(chain_end):
    c = intermediary(chain_end, 5)
    try:
        sendval = None
        while True:
            result = c.send(sendval)
            print(f"Got {result} from the coroutine.")
            sendval = sendval + 1 if sendval is not None else 0
    except StopIteration as e:
        print(f"StopIteration exception: {e!r}")

As you can see, anything that defines an __await__ method that returns an iterator can also be awaited upon. What really happens is that the thing being awaited upon is iterated over until it stops and then the await returns. The reason you do this is that the final thing at the end of the chain may encounter some kind of blocking condition. It can then report on that condition (or ask a callback to be set or something else) by yielding or returning a value from the iterator (basically the same thing as yielding). Then the top level loop can continue on to whatever other thing can be run.

The nature of the whole chain of await calls is that when you then go back and ask for the next value from the iterator (call back into the blocked function telling it that maybe it isn't blocked now) the entire call stack is reactivated. This whole chain exists as a way to preserve the state of the call stack while the call is blocked. Basically a thread that voluntarily gives up control rather than having control wrested from it by a scheduler.

The vision in my head of how asyncio worked internally when I asked this question is apparently how something called curio works and is based on the end point routines yielding some sort of indicator of what they're being blocked by and the top level loop that's running it all (runco in my example) then putting that in some sort of general pool of conditions to look for so it can resume the routine as soon as the condition it's blocked by changes. In asyncio, something much more complex happens, and it uses objects with the __await__ method (like MyAwaitable in my example) and some sort of callback mechanism to make it all work.

Brett Cannon wrote a really good article that talks about how generators evolved into coroutines. It will go into far more detail than I can go into in a StackOverflow answer.

One interesting tidbit I discovered is that when you do this:

def foo(x):
    yield 11

bar = types.coroutine(foo)

Both foo and bar become 'coroutines' and can be awaited on. All the decorator does is flip a bit in foo.__code__.co_flags. This is, of course, an implementation detail and should not be relied upon. I think this is something of a bug actually, and I may report it as such.

Omnifarious
  • 54,333
  • 19
  • 131
  • 194
  • 1
    Your conclusion is a bit exaggerated, and there are a few other things to consider. Asyncio has been designed 5 years ago, 3 years before async/await was added to the language. An important aspect of its design is interoperability through a pluggable event loop interface, so it's easy for other frameworks to become asyncio compatible. And since most of the existing event loops were already based on callbacks, it made a lot of sense to build asyncio in a similar way. – Vincent Mar 21 '17 at 22:12
  • 1
    Futures and coroutines have been built on top of this base, in order to provide a high-level interface. It's only later that people realized it was interesting to get rid of callbacks and futures to focus exclusively on coroutines instead. More information [here](https://github.com/dabeaz/curio#other-resources). Also, see this [example](https://gist.github.com/vxgmichel/9fa643218c60272b3c8f56a0c275305a). – Vincent Mar 21 '17 at 22:12
  • It does indeed answer the first part of your question, though you might want to have a closer look at the concepts presented in [this answer](http://stackoverflow.com/a/41208685/2846140). Also, here's a [great article by Brett Cannon](https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/). – Vincent Mar 22 '17 at 00:29
  • Using coroutines without I/O and without a loop does not make any sense. Regular genetors and functions should be used instead. A lot of work has been done recently in `asyncio`, and using `async def` for coroutines and async generators is the current best practice. The code above shows only one part of the story - but to fully understand (and explain) why couroutines should be used, and why they were designed this way, they actually need to do some work, asynchronously. – Udi Mar 22 '17 at 07:32
  • @Udi - Clearly, yes. My example is totally useless. The goal wasn't to be useful exactly. I just wanted to understand how the mechanics of the situation worked. – Omnifarious Mar 22 '17 at 15:02
  • @Vincent - Thanks for the link to Brett Cannon's article. I've added it to the answer. – Omnifarious Mar 22 '17 at 15:41
0

There is an example in the documentation, that looks almost exactly like what you are trying to do. It contains a sleep call (used instead of IO), so that the asyncio aspect makes sense.

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
Penguin Brian
  • 1,991
  • 14
  • 25
0

Going through the code you have supplied above, an async def that includes a yield creates an Asynchronous Generator:

async def foo(x):
    yield x
    yield x + 1

To consume data from it, use async for:

async def intermediary(y):
    results = []
    async for x in foo(y):
        results.append(x)
    return results

To consume a result from a simple coroutine such as intermediary from a regular function, you will need to create an event loop and to use run_until_complete():

loop = asyncio.get_event_loop()
result = loop.run_until_complete(intermediary(5))
print(result)
loop.close()
Udi
  • 29,222
  • 9
  • 96
  • 129
  • Suppose I want to create my own event loop. Somewhere, that chain of `await`ed calls ends in something. What does it end with? How do I signal to the event loop that it should call me back at some point? I updated my question, so that might help with what my question is about. – Omnifarious Mar 21 '17 at 05:50
  • I think it will be fair to ask a new question "How does asyncio works?" - I'll be happy to answer :-) Let's continue in chat: http://chat.stackoverflow.com/rooms/138590/how-does-python-asyncio-work – Udi Mar 21 '17 at 05:56
  • I'll be there in 5-10 minutes. – Omnifarious Mar 21 '17 at 05:58