0

I have a structure similar to what described in Equivalent of asyncio.Queues with worker "threads".

Difference is my "producer" will open a serial line and asynchronously parse input till a "token" is recognized (instead of generating random numbers). Tokens are then passed via asyncio.Queue to "consumer".

Tentative code includes:

@asyncio.coroutine
def produce():
    with open('infile.cmd', 'r') as ifd:
        while True:
            cmd = yield from ifd.readline()
            if cmd is None:
                break
            print("received {}".format(cmd))
            yield from q.put(cmd)

but this doesn't work because of "RuntimeError: Task got bad yield: 'p'" on the ifd.readline() line.

I also tried using a Reader (cfr.: Using asyncio to read the output of a serial port):

event_loop = asyncio.get_event_loop()
try:
    with open('infile.cmd', 'r') as ifd:
        event_loop.add_reader(ifd, produce)
    event_loop.create_task(consume())
    print('entering event loop')
    event_loop.run_forever()

but this bombs with:

Traceback (most recent call last):
  File "/home/mcon/trasmissione-telematica/Communications/prove/asio.py", line 32, in <module>
    event_loop.add_reader(ifd, produce)
  File "/usr/lib/python3.5/asyncio/selector_events.py", line 337, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/lib/python3.5/asyncio/selector_events.py", line 267, in _add_reader
    (handle, None))
  File "/usr/lib/python3.5/selectors.py", line 412, in register
    self._epoll.register(key.fd, epoll_events)
PermissionError: [Errno 1] Operation not permitted

How can I asynchronously read from something?

Note: in this example I'm reading from file, but in the end I'll have to read binary data from a non-blocking serial line (either /dev/ttySx or named pipe), so o line-oriented input will be allowed; data must be read as soon as it's available and "producer" is responsible to understand when a "cmd" is complete.

ZioByte
  • 2,690
  • 1
  • 32
  • 68

1 Answers1

0

Using aiofiles and python 3.5+ the following code worked great:

async def produce():
    async with aiofiles.open('test.cmd', mode = 'r') as ifd:
        while True:
            cmd = await ifd.readline()
            #readlines ourput is not stripped
            cmd = cmd.strip()
            if not cmd or cmd=='stop':
                print("finished")
                break
            print("cmd {}".format(cmd))
            await q.put(cmd)

loop = asyncio.get_event_loop()
q = asyncio.Queue()
loop.run_until_complete(produce())

test.cmd

mv hello
cp world
stop

output:

cmd mv hello
cmd cp world
finished
etlsh
  • 701
  • 1
  • 8
  • 18