25

I have a Connection type that I’m using to wrap read/write stream pairs from asyncio.

class Connection(object):

    def __init__(self, stream_in, stream_out):
        self._streams_ = (stream_in, stream_out)

    def read(self, n_bytes: int = -1):
        stream = self._streams_[0]
        return stream.read(n_bytes)

    def write(self, bytes_: bytes):
        stream = self._streams_[1]
        stream.write(bytes_)
        yield from stream.drain()

When a new client connects to the server, new_connection will create a new Connection object, and expect to receive 4 bytes.

@asyncio.coroutine
def new_connection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    data = yield from conn.read(4)
    print(data)

The client sends 4 bytes.

@asyncio.coroutine
def client(loop):
    ...
    conn = Connection(stream_in, stream_out)
    yield from conn.write(b'test')

This works about as I expect, but I do have to write yield from for every call to read and write. I've tried moving the yield from into Connection for this reason.

def read(self, n_bytes: int = -1):
    stream = self._streams_[0]
    data = yield from stream.read(n_bytes)
    return data

But, instead of the expected data bytes, I get a generator object.

<generator object StreamReader.read at 0x1109983b8>

So, for every call to read and write, I must be careful to have the yield from. My goal is to reduce new_connection to the following.

@asyncio.coroutine
def new_connection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    print(conn.read(4))
Zach Gates
  • 4,045
  • 1
  • 27
  • 51
  • Why do you have to yield from? If you don't yield from conn.read(4), it looks to me like it simply returns a bytes object. Is that what you are looking for here? – RageCage Aug 29 '17 at 20:48
  • @RageCage: Without `yield from`ing, `conn.read(4)` still returns a generator: `` – Zach Gates Aug 29 '17 at 20:52
  • Sorry I should have clarified; if you don't yield from the first iteration of conn.read() (the single line version) what is the result? – RageCage Aug 29 '17 at 20:56
  • @RageCage: If you mean `def read(self, n_bytes): return self.__in.read(n_bytes)` in conjunction with `data = conn.read(4)`, I'm still getting a generator (`Connection.read`). – Zach Gates Aug 29 '17 at 21:01
  • Sounds like there is some context I'm missing. The StreamReader.read function should return a regular byte array, so if you never use the yield keyword anywhere in the reading workflow a generator should never be made. – RageCage Aug 29 '17 at 21:06
  • @RageCage: This is in the context of asynchronous I/O and coroutines with the `asyncio` module. – Zach Gates Aug 29 '17 at 21:10
  • I have posted a possible answer that explains some concepts and attaches some resources. Hopefully we can work off it to get your code working as you need it to. Your particular problem is very difficult to reproduce in exactly this context, so if missing information can fill in the gaps then that would be very helpful. – RageCage Aug 30 '17 at 01:46
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/153290/discussion-between-ragecage-and-zach-gates). – RageCage Aug 30 '17 at 18:31

2 Answers2

6

Because StreamReader.read is a coroutine, your only options for calling it are a) wrapping it in a Task or Future and running that via an event loop, b) awaiting it from coroutine defined with async def, or c) using yield from with it from a coroutine defined as a function decorated with @asyncio.coroutine.

Since Connection.read is called from an event loop (via the coroutine new_connection), you can't reuse that event loop to run a Task or Future for StreamReader.read: event loops can't be started while they're already running. You'd either have to stop the event loop (disastrous and probably not possible to do correctly) or create a new event loop (messy and defeating the purpose of using coroutines). Neither of those are desirable, so Connection.read needs to be a coroutine or an async function.

The other two options (await in an async def coroutine or yield from in a @asyncio.coroutine-decorated function) are mostly equivalent. The only difference is that async def and await were added in Python 3.5, so for 3.4, using yield from and @asyncio.coroutine is the only option (coroutines and asyncio didn't exist prior to 3.4, so other versions are irrelevant). Personally, I prefer using async def and await, because defining coroutines with async def is cleaner and clearer than with the decorator.

In brief: have Connection.read and new_connection be coroutines (using either the decorator or the async keyword), and use await (or yield from) when calling other coroutines (await conn.read(4) in new_connection, and await self.__in.read(n_bytes) in Connection.read).

  • 1
    Ah, very nice answer Mego! This is clearly written by someone who knows what their talking about. I learned much from reading it. +1 – Christian Dean Sep 03 '17 at 20:57
2

I found a chunk of the StreamReader source code on line 620 is actually a perfect example of the function's usage.

In my previous answer, I overlooked the fact that self.__in.read(n_bytes) is not only a coroutine (which I should've known considering it was from the asyncio module XD) but it yields a result on line . So it is in fact a generator, and you will need to yield from it.

Borrowing this loop from the source code, your read function should look something like this:

def read(self, n_bytes : int = -1):
    data = bytearray() #or whatever object you are looking for
    while 1:
        block = yield from self.__in.read(n_bytes)
        if not block:
            break
        data += block
    return data

Because self.__in.read(n_bytes) is a generator, you have to continue to yield from it until it yields an empty result to signal the end of the read. Now your read function should return data rather than a generator. You won't have to yield from this version of conn.read().

RageCage
  • 722
  • 2
  • 6
  • 19
  • Using the function exactly as you've provided it, I'm still receiving a generator object (`Connection.read`). – Zach Gates Aug 30 '17 at 18:04
  • Are you still yielding from the conn.read call? Try printing data and type(data) in the read function to see what it is before returning. – RageCage Aug 30 '17 at 18:28
  • No, I removed that and tried `data = conn.read(4)` instead. It is a generator. – Zach Gates Aug 30 '17 at 18:28
  • Using `yield from` like this won't work, because `StreamReader.read` isn't a generator - it's a coroutine. The two different uses of `yield from` (delegating with generators and handing off control between coroutines) is exactly why the `await` keyword was introduced in 3.5. –  Sep 03 '17 at 03:49