Using the answers here as a basis (which use SubprocessProtocol
), I'm simply trying to read from a subprocess and stop reading (and terminate the subprocess) at a point of my choosing (e.g., I've read enough data).
Note that I do want the benefit of using run_until_complete
per another discussion.
I happen to be using Windows, and the example below is using cat
from Cygwin. The actual utility I'm using is just a native windows console application - but one that will stream until it's closed manually.
I can read the data just fine, but my attempts to stop reading and close the subprocess (e.g., calling loop.stop() from within pipe_data_received()
) lead to exceptions (RuntimeError: Event loop is closed
and ValueError: I/O operation on closed pipe
). I'd like to immediately terminate the subprocess gracefully.
I don't think it's so much the platform as it is my not seeing where to properly interrupt things to have the desired effect. Any ideas on how to accomplish this?
My Python 3.7+ code (as modified from the example):
import asyncio
import os
external_program = "cat" # Something that will output to stdio
external_option = "a" # An arbitrarily large amount of data
saved_data = []
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
data_len = len(data)
print(''.join(' {:02x}'.format(x) for x in data), flush=True)
saved_data.extend(data)
if len(saved_data) > 512: # Stop once we've read this much data
loop.call_soon_threadsafe(loop.stop)
def connection_lost(self, exc):
print("Connection lost")
loop.stop() # end loop.run_forever()
print("START")
if os.name == 'nt':
# On Windows, the ProactorEventLoop is necessary to listen on pipes
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
loop.subprocess_exec(
SubprocessProtocol,
external_program,
external_option,
)
)
loop.run_forever()
finally:
loop.close()
print("DONE")
loop.close()