10

asyncio has StreamReader.readline(), allowing something like:

while True:
    line = await reader.readline()
    ...

(I don't see async for available in asyncio but that would be the obvious evolution)

How do I achieve the equivalent with trio?

I don't see any high level support for this directly in trio 0.9. All I see is ReceiveStream.receive_some() which returns arbitrary sized binary chunks; it seems non-trivial to me to decode and convert this to something line-wise. Is there a standard library function or snippet I can use? I found the io stdlib module which looks promising, but I don't see any way to provide a "feed" method.

Nathaniel J. Smith
  • 11,613
  • 4
  • 41
  • 49
Robie Basak
  • 6,492
  • 2
  • 30
  • 34

3 Answers3

11

You're right, there's no highlevel support for this included in Trio currently. There should be something, though I'm not 100% sure what it should look like. I opened an issue to discuss it.

In the mean time, your implementation looks reasonable.

If you want to make it even more robust, you might (1) use a bytearray instead of bytes for your buffer, to make appending and deleting amortized O(n) instead of O(n^2), (2) put a limit on the maximum line length, so evil peers can't force you to waste infinite memory buffering infinitely long lines, (3) resume each call to find at the place where the last one left off instead of restarting from the beginning each time, again to avoid O(n^2) behavior. None of this is super important if you're only dealing with reasonable line-lengths and well-behaved peers, but it doesn't hurt, either.

Here's a tweaked version of your code that tries to incorporate those three ideas:

class LineReader:
    def __init__(self, stream, max_line_length=16384):
        self.stream = stream
        self._line_generator = self.generate_lines(max_line_length)

    @staticmethod
    def generate_lines(max_line_length):
        buf = bytearray()
        find_start = 0
        while True:
            newline_idx = buf.find(b'\n', find_start)
            if newline_idx < 0:
                # no b'\n' found in buf
                if len(buf) > max_line_length:
                    raise ValueError("line too long")
                # next time, start the search where this one left off
                find_start = len(buf)
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                # Update the buffer in place, to take advantage of bytearray's
                # optimized delete-from-beginning feature.
                del buf[:newline_idx+1]
                # next time, start the search from the beginning
                find_start = 0
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

(Feel free to use under whatever license you like.)

Nathaniel J. Smith
  • 11,613
  • 4
  • 41
  • 49
  • Very useful, thanks! I was especially interested (/surprised) that `bytearray` has that efficient front deletion. I couldn't find it documented anywhere, but I found that it was added in [issue 19087](https://bugs.python.org/issue19087). – Arthur Tacca Sep 07 '20 at 15:01
  • This won't necessarily limit the length of lines returned to `max_line_length`. There is only a check in the `newline_idx < 0` branch, so if a long line is received at the same time as its delimeter then it will be returned. Of course, that doesn't matter if the only reason for that limit is to stop the buffer growing forever if you never get a delimeter. – Arthur Tacca Sep 07 '20 at 15:14
  • I need to pushback data occasionally. The only way I have devised is to make `generate_lines` a normal method and `buf` and `find_start` instance attributes. Is this the appropriate way to do this. Thank you. – NameOfTheRose Mar 15 '21 at 11:01
  • Regrading the pushback functionality, another idea is to leverage `generator.throw` method passing back the data as 2nd argument `self._line_generator.throw(ValueError,data)`. I have implemented both approaches and both seem to work. – NameOfTheRose Mar 17 '21 at 07:28
1

I ended up writing this. Not properly tested (bugfixes welcome), but it seems to work:

class LineReader:
    def __init__(self, stream):
        self.stream = stream
        self._line_generator = self.generate_lines()

    @staticmethod
    def generate_lines():
        buf = bytes()
        while True:
            newline_idx = buf.find(b'\n')
            if newline_idx < 0:
                # no b'\n' found in buf
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                buf = buf[newline_idx+1:]
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

Then I can wrap the ReceiveStream with a LineReader and use its readline method. Adding __aiter__() and __anext()__ would then be trivial, but I don't need it in my case (I'm porting something to trio that doesn't use async for anyway).

The other flaw with this is that it assumes UTF-8 or a similar encoding where b'\n' newlines exist in the encoded bytes object unmodified.

It'd be nice to rely on library functions to handle this though; other answers appreciated.

Robie Basak
  • 6,492
  • 2
  • 30
  • 34
0

A very naive approach that I'm using:

async def readline(stdout: trio.abc.ReceiveStream):
    data = b""
    while True:
        _data = await stdout.receive_some()
        if _data == b"":
            break
        data += _data
        if data.endswith(b"\n"):
            break
    return data

# use it like this:
async def fn():
    async with await trio.open_process(..., stdout=subprocess.PIPE) as process:
        while True:
            # instead of:
            #   data = process.stdout.receive_some()
            # use this:
            line = await readline(process.stdout)
            if line == b"":
                break
waszil
  • 390
  • 2
  • 5
  • 15
  • I think this will combine lines in the case that `receive_some()` returns data over a newline boundary? It might also loop forever (and consume infinite memory) if a sender never stops transmission exactly on a newline boundary. – Robie Basak Jul 07 '20 at 12:57
  • I said it is very naive, it works for my special case (at least for now) – waszil Jul 08 '20 at 13:07