Can somebody provide a sample of code which listen to keypress in nonblocking manner with asynio and put the keycode in console on every click?
It's not a question about some graphical toolkit
Can somebody provide a sample of code which listen to keypress in nonblocking manner with asynio and put the keycode in console on every click?
It's not a question about some graphical toolkit
So the link provided by Andrea Corbellini is a clever and thorough solution to the problem, but also quite complicated. If all you want to do is prompt your user to enter some input (or simulate raw_input), I prefer to use the much simpler solution:
import sys
import functools
import asyncio as aio
class Prompt:
def __init__(self, loop=None):
self.loop = loop or aio.get_event_loop()
self.q = aio.Queue()
self.loop.add_reader(sys.stdin, self.got_input)
def got_input(self):
aio.ensure_future(self.q.put(sys.stdin.readline()), loop=self.loop)
async def __call__(self, msg, end='\n', flush=False):
print(msg, end=end, flush=flush)
return (await self.q.get()).rstrip('\n')
prompt = Prompt()
raw_input = functools.partial(prompt, end='', flush=True)
async def main():
# wait for user to press enter
await prompt("press enter to continue")
# simulate raw_input
print(await raw_input('enter something:'))
loop = aio.get_event_loop()
loop.run_until_complete(main())
loop.close()
EDIT: I removed the loop parameter form Queue
as it is removed in 3.10.
Also, these days I use structured concurrency (trio), and if anyone is curious this is pretty easy to do in trio:
import trio, sys
async def main():
async with trio.lowlevel.FdStream(sys.stdin.fileno()) as stdin:
async for line in stdin:
if line.startswith(b'q'):
break
print(line)
trio.run(main)
I wrote something similar as part of a package called aioconsole.
It provides a coroutine called get_standard_streams
that returns two asyncio streams corresponding to stdin
and stdout
.
Here's an example:
import asyncio
import aioconsole
async def echo():
stdin, stdout = await aioconsole.get_standard_streams()
async for line in stdin:
stdout.write(line)
loop = asyncio.get_event_loop()
loop.run_until_complete(echo())
It also includes an asynchronous equivalent to input
:
something = await aioconsole.ainput('Entrer something: ')
It should work for both file and non-file streams. See the implementation here.
The high-level pure-asyncio way to do this is as follows.
import asyncio
import sys
async def main():
# Create a StreamReader with the default buffer limit of 64 KiB.
reader = asyncio.StreamReader()
pipe = sys.stdin
loop = asyncio.get_event_loop()
await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), pipe)
async for line in reader:
print(f'Got: {line.decode()!r}')
asyncio.run(main())
The async for line in reader
loop can be written more explicitly, e.g. if you want to print a prompt or catch exceptions inside the loop:
while True:
print('Prompt: ', end='', flush=True)
try:
line = await reader.readline()
if not line:
break
except ValueError:
print('Line length went over StreamReader buffer limit.')
else:
print(f'Got: {line.decode()!r}')
An empty line
(not '\n'
but an actually empty string ''
) means end-of-file. Note that it is possible for await reader.readline()
to return ''
right after reader.at_eof()
returned False. See Python asyncio: StreamReader for details.
Here readline()
is asynchronously gathering a line of input. That is, the event loop can run while the reader waits for more characters. In contrast, in the other answers, the event loop could block: it could detect that some input is available, enter the function calling sys.stdin.readline()
, and then block on it until an endline becomes available (blocking any other tasks from entering the loop). Of course this isn't a problem in most cases, as the endline becomes available together with (in case of line buffering, which is the default) or very soon after (in other cases, assuming reasonably short lines) any initial characters of a line.
You can also read individual bytes with await reader.readexactly(1)
to read byte-per-byte, when reading from a pipe. When reading key-presses from a terminal, it needs to be set up properly, see Key Listeners in python? for more. On UNIX:
import asyncio
import contextlib
import sys
import termios
@contextlib.contextmanager
def raw_mode(file):
old_attrs = termios.tcgetattr(file.fileno())
new_attrs = old_attrs[:]
new_attrs[3] = new_attrs[3] & ~(termios.ECHO | termios.ICANON)
try:
termios.tcsetattr(file.fileno(), termios.TCSADRAIN, new_attrs)
yield
finally:
termios.tcsetattr(file.fileno(), termios.TCSADRAIN, old_attrs)
async def main():
with raw_mode(sys.stdin):
reader = asyncio.StreamReader()
loop = asyncio.get_event_loop()
await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
while not reader.at_eof():
ch = await reader.read(1)
# '' means EOF, chr(4) means EOT (sent by CTRL+D on UNIX terminals)
if not ch or ord(ch) <= 4:
break
print(f'Got: {ch!r}')
asyncio.run(main())
Note this is not really one character or one key at a time: if the user presses a key combination that gives a multi-byte character, like ALT+E, nothing will happen on pressing ALT and two bytes will be sent by the terminal on pressing E, which will result in two iterations of the loop. But it's good enough for ASCII characters like letters and ESC.
If you need actual key presses like ALT, I suppose the only way is to use a suitable library and make it work with asyncio by calling it in a separate thread, like here. In fact the library+thread approach is probably simpler in other cases as well.
If you want finer control you can implement your own protocol in place of StreamReaderProtocol
: a class implementing any number of functions of asyncio.Protocol
. Minimal example:
class MyReadProtocol(asyncio.Protocol):
def __init__(self, reader: asyncio.StreamReader):
self.reader = reader
def connection_made(self, pipe_transport):
self.reader.set_transport(pipe_transport)
def data_received(self, data: bytes):
self.reader.feed_data(data)
def connection_lost(self, exc):
if exc is None:
self.reader.feed_eof()
else:
self.reader.set_exception(exc)
You could replace the StreamReader with your own buffering mechanism. After you call connect_read_pipe(lambda: MyReadProtocol(reader), pipe)
, there will be exactly one call to connection_made
, then arbitrary many calls to data_received
(with data depending on terminal and python buffering options), then eventually exactly one call to connection_lost
(on end-of-file or on error). In case you ever need them, connect_read_pipe
returns a tuple (transport, protocol)
, where protocol
is an instance of MyReadProtocol
(created by the protocol factory, which in our case is a trivial lambda), while transport
is an instance of asyncio.ReadTransport
(specifically some private implementation like _UnixReadPipeTransport
on UNIX).
But in the end this is all boilerplate that eventually relies on loop.add_reader
(unrelated to StreamReader
).
For Windows you might need to choose the ProactorEventLoop
(the default since Python 3.8), see Python asyncio: Platform Support.
An alternative to using queues would be to make the command line an asyn generator, and process the commands as they come in, like so:
import asyncio
import sys
class UserInterface(object):
def __init__(self, task, loop):
self.task = task
self.loop = loop
def get_ui(self):
return asyncio.ensure_future(self._ui_task())
async def _ui_cmd(self):
while True:
cmd = sys.stdin.readline()
cmd = cmd.strip()
if cmd == 'exit':
self.loop.stop()
return
yield cmd
async def _ui_task(self):
async for cmd in self._ui_cmd():
if cmd == 'stop_t':
self.task.stop()
elif cmd == 'start_t':
self.task.start()
Python 3.10 update to the solution provided by bj0:
class Prompt:
def __init__(self):
self.loop = asyncio.get_running_loop()
self.q = asyncio.Queue()
self.loop.add_reader(sys.stdin, self.got_input)
def got_input(self):
asyncio.ensure_future(self.q.put(sys.stdin.readline()), loop=self.loop)
async def __call__(self, msg, end='\n', flush=False):
print(msg, end=end, flush=flush)
# https://docs.python.org/3/library/asyncio-task.html#coroutine
task = asyncio.create_task(self.q.get())
return (await task).rstrip('\n')
I tested it on a websocket client, inside an async function that would get stuck waiting for input, so I replaced s = input("insert string")
with s = await prompt("insert string")
and now ping-ponging works even while the program is waiting for user input, the connection does not stop anymore and the issue "timed out waiting for keepalive pong" is solved.