9

I have a need to async read StdIn in order to get messages (json terminated by \r\n) and after processing async write updated message to StdOut.

At the moment I am doing it synchronous like:

class SyncIOStdInOut():
    def write(self, payload: str):
        sys.stdout.write(payload)
        sys.stdout.write('\r\n')
        sys.stdout.flush()

    def read(self) -> str:
        payload=sys.stdin.readline()
        return  payload

How to do the same but asynchronously?

alex_noname
  • 26,459
  • 5
  • 69
  • 86
user3225309
  • 1,183
  • 3
  • 15
  • 31

2 Answers2

19

Here's an example of echo stdin to stdout using asyncio streams (for Unix).

import asyncio
import sys


async def connect_stdin_stdout():
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
    return reader, writer


async def main():
    reader, writer = await connect_stdin_stdout()
    while True:
        res = await reader.read(100)
        if not res:
            break
        writer.write(res)
        await writer.drain()


if __name__ == "__main__":
    asyncio.run(main())

As a ready-to-use solution, you could use aioconsole library. It implements a similar approach, but also provide additional useful asynchronous equivalents to input, print, exec and code.interact:

from aioconsole import get_standard_streams

async def main():
    reader, writer = await get_standard_streams()

Update:

Let's try to figure out how the function connect_stdin_stdout works.

  1. Get the current event loop:
loop = asyncio.get_event_loop()
  1. Create StreamReader instance.
reader = asyncio.StreamReader()

Usually, StreamReader/StreamWriter classes are not intended to be directly instantiated and should only be used as a result of functions such as open_connection() and start_server(). StreamReader provides a buffered asynchronous interface to some data stream. Some source(library code) calls its functions such as feed_data, feed_eof, the data is buffered and can be read using the documented interface coroutine read(), readline(), and etc.

  1. Create StreamReaderProtocol instance.
protocol = asyncio.StreamReaderProtocol(reader)

This class is derived from asyncio.Protocol and FlowControlMixin and helps to adapt between Protocol and StreamReader. It overrides such Protocol methods as data_received, eof_received and calls StreamReader methods feed_data.

  1. Register standard input stream stdin in the event loop.
await loop.connect_read_pipe(lambda: protocol, sys.stdin)

The connect_read_pipe function takes as a pipe parameter a file-like object. stdin is a file-like object. From now, all data read from the stdin will fall into the StreamReaderProtocol and then pass into StreamReader

  1. Register standard output stream stdout in the event loop.
w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)

In connect_write_pipe you need to pass a protocol factory that creates protocol instances that implement flow control logic for StreamWriter.drain(). This logic is implemented in the class FlowControlMixin. Also StreamReaderProtocol inherited from it.

  1. Create StreamWriter instance.
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)

This class forwards the data passed to it using functions write(), writelines() and etc. to the underlying transport.

protocol is used to support the drain() function to wait for the moment that the underlying transport has flushed its internal buffer and is available for writing again.

reader is an optional parameter and can be None, it is also used to support the drain() function, at the start of this function it is checked if an exception was set for the reader, for example, due to a connection lost (relevant for sockets and bidirectional connections), then drain() will also throw an exception.

You can read more about StreamWriter and drain() function in this great answer.

Update 2:

To read lines with \r\n separator readuntil can be used

alex_noname
  • 26,459
  • 5
  • 69
  • 86
  • Can you please explain a bit concept here. For example, what is the purpose of the reader, protocol, connect_read_pipe, w_transport, connect_write_pipe? What is the purpose of all those elements and how they work with conenct_read/write_pipe? Why you needed dummy as a protocol? How come writer uses the same protocol as reader? I believe that for reading lines I should use reader.readline(). How to set line termination to \r\n for reading and writing? As you can see I am quite confused, but your solution works. Thank you very much for your answer. – user3225309 Oct 12 '20 at 15:48
  • Also, I have just noticed that writer is not awaited. – user3225309 Oct 12 '20 at 16:58
  • Hello, I will try to describe in more detail a little later – alex_noname Oct 12 '20 at 17:08
  • Thanks in advance. – user3225309 Oct 12 '20 at 19:57
  • This is the furthest I have come: https://pastebin.com/2Rwch6Dr – user3225309 Oct 12 '20 at 20:15
  • can you please answer? – user3225309 Oct 17 '20 at 14:48
  • I changed the function a bit and updated the answer – alex_noname Oct 18 '20 at 18:38
  • Alex, at first thank you very much for your very detailed answer. I have carefully read every word of it. Still I have some issues. :) I saw that connect_read_pipe returns (transport, protocol), but from your code I can see that you are not interested to those values. As protocol is defined a line above, I am concluding that we don't need read transport at all. If I understood you correctly, connect_read_pipe will enable me to use await reader.read(100) whenever I need to read data from StdIn. Regarding writes (it will continue in next comment)... – user3225309 Oct 23 '20 at 03:22
  • Why StreamWriter neds reader as a third parameter? If I would like to write to StdErr, will this be enough? e_transport, e_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stderr) e_writer = asyncio.StreamWriter(e_transport, e_protocol, reader, loop) [do I still need reader?]. I have to use stderr for logging. After everything you have said I am not sure should I use these lines above or somthing like aiologger. Anyway, thank you very much for all information you have provided. Best regards. – user3225309 Oct 23 '20 at 03:31
  • You are right about `connect_read_pipe`. For the rest I have updated item 6 – alex_noname Oct 23 '20 at 07:34
  • Alex, thank you very much for all information you have provided. Everything works as it should. Best regards. – user3225309 Oct 29 '20 at 17:46
  • Is it a typo in not awaiting for `writer.write` call? – oliora Jun 19 '23 at 10:07
  • No, [it's not](https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamWriter.write) a typo. But it is correct to use `write` in conjunction with `await drain()`. I've updated the answer. Thank you for your notice. – alex_noname Jun 19 '23 at 12:18
2

This is another way you can async read from stdin (reads a single line at a time).

async def async_read_stdin()->str:
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, sys.stdin.readline)
  • 1
    Thanks, but I prefer what @alex_noname has suggested. – user3225309 May 19 '21 at 08:47
  • If the asycnio task that calls this is cancelled, then the background thread blocking on readline() will still be running and due to https://stackoverflow.com/a/49992422/1976323 the program will not exit. – David Lechner Oct 29 '22 at 21:20