4

Using the answers here as a basis (which use SubprocessProtocol), I'm simply trying to read from a subprocess and stop reading (and terminate the subprocess) at a point of my choosing (e.g., I've read enough data).

Note that I do want the benefit of using run_until_complete per another discussion.

I happen to be using Windows, and the example below is using cat from Cygwin. The actual utility I'm using is just a native windows console application - but one that will stream until it's closed manually.

I can read the data just fine, but my attempts to stop reading and close the subprocess (e.g., calling loop.stop() from within pipe_data_received()) lead to exceptions (RuntimeError: Event loop is closed and ValueError: I/O operation on closed pipe). I'd like to immediately terminate the subprocess gracefully.

I don't think it's so much the platform as it is my not seeing where to properly interrupt things to have the desired effect. Any ideas on how to accomplish this?

My Python 3.7+ code (as modified from the example):

import asyncio
import os

external_program = "cat"  # Something that will output to stdio
external_option = "a"  # An arbitrarily large amount of data
saved_data = []

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            data_len = len(data)
            print(''.join(' {:02x}'.format(x) for x in data), flush=True)
            saved_data.extend(data)
            if len(saved_data) > 512:  # Stop once we've read this much data
                loop.call_soon_threadsafe(loop.stop)
    def connection_lost(self, exc):
        print("Connection lost")
        loop.stop() # end loop.run_forever()

print("START")

if os.name == 'nt':
    # On Windows, the ProactorEventLoop is necessary to listen on pipes
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

try:
    loop.run_until_complete(
        loop.subprocess_exec(
            SubprocessProtocol, 
            external_program,
            external_option,
        )
    )
    loop.run_forever()
finally:
    loop.close()

print("DONE")
loop.close()
MartyMacGyver
  • 9,483
  • 11
  • 47
  • 67

1 Answers1

1

Not an asyncio expert, but something like this should work.

import time
import asyncio
import threading

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def __init__(self, loop):
        self.transport = None
        self.loop = loop

    def pipe_data_received(self, fd, data):
        print('data received')

    def connection_lost(self, exc):
        print("Connection lost")

    def connection_made(self, transport):
        print("Connection made")

        self.transport = transport

        # becasue calc won't call pipe_data_received method.
        t = threading.Thread(target=self._endme)
        t.setDaemon(True)
        t.start()

    def _endme(self):
        time.sleep(2)
        # You'd normally use these inside pipe_data_received, connection_lost methods
        self.transport.close()
        self.loop.stop()


def main():
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(loop.subprocess_exec(
            lambda: SubprocessProtocol(loop), 
            'calc.exe'
        ))

    loop.run_forever()
    loop.close()

if __name__ == "__main__":
    main()
Himal
  • 1,351
  • 3
  • 13
  • 28
  • Well, that worked terribly with calc.exe (at least in Windows 10). HOWEVER, it seems to work well with the command-line utility I actually need to use, so that's pretty cool (and the most important part)! I'm gonna poke at this for a little bit and see if I can make it do what I need it to do in terms of reading to a limit, but it looks very promising... – MartyMacGyver Jun 11 '19 at 05:08
  • 1
    Oh, really ? I tried it on Windows 8.1 and haven't noticed any issues with "calc.exe". anyway, glad to hear that it helped. I don't see why it shouldn't work inside `pipe_data_received` as well. please let me know how it goes. – Himal Jun 11 '19 at 06:03
  • I think the calculator in Windows 10 is a different sort of app from previous ones - it launches but doesn't close. I made progress with the changes you suggested here, and right now I'm just trying to see if there's any *clean* way to catch KeyboardInterrupt. Also (from the original code) I wonder what is the purpose of having both loop.run_until_complete() and loop.run_forever() together... – MartyMacGyver Jun 12 '19 at 01:51
  • You should be able to achieve both with something like [this](https://docs.python.org/3.7/library/asyncio-protocol.html#asyncio-example-subprocess-proto) . You can use an [Event](https://docs.python.org/3/library/asyncio-sync.html#asyncio.Event) to pass the `KeyboardInterrupt` to the `SubprocessProtocol` – Himal Jun 12 '19 at 07:32
  • *You might have to use a `multiprocessing.Event` since above isn't thread-safe and not sure how it'll behave with multiprocessing. – Himal Jun 12 '19 at 07:43