18

So before adding try/catch block my event loop closed gracefully when process ran for less than 5 minutes, but after adding try/catch block I started getting this error when the process exceeded 5 minutes

async def run_check(shell_command):
    p = await asyncio.create_subprocess_shell(shell_command,
                    stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    fut = p.communicate()
    try:
        pcap_run = await asyncio.wait_for(fut, timeout=5)
    except asyncio.TimeoutError:
        pass

def get_coros():
    for pcap_loc in print_dir_cointent():
        for pcap_check in get_pcap_executables():
            tmp_coro = (run_check('{args}'
            .format(e=sys.executable, args=args)))
            if tmp_coro != False:
                coros.append(tmp_coro)
     return coros

async def main(self):
    p_coros = get_coros()
    for f in asyncio.as_completed(p_coros):
        res = await f




loop = asyncio.get_event_loop()
loop.run_until_complete(get_coros())
loop.close()

Traceback:

Exception ignored in: <bound method BaseSubprocessTransport.__del__ of 
    <_UnixSubprocessTransport closed pid=171106 running stdin=
    <_UnixWritePipeTransport closing fd=8 open> stdout=<_UnixReadPipeTransport fd=9 open>>>
    Traceback (most recent call last):
      File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
      File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
      File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
      File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
      File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
      File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
      File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
    RuntimeError: Event loop is closed

The traceback occurs after the last line in my code is executed.

Debug logs:

DEBUG:asyncio:Using selector: EpollSelector
DEBUG:asyncio:run shell command '/local/p_check w_1.pcap --json' stdin=<pipe> stdout=stderr=<pipe>
DEBUG:asyncio:process '/local/p_check w_1.pcap --json' created: pid 171289DEBUG:asyncio:Write pipe 8 connected: (<_UnixWritePipeTransport fd=8 idle bufsize=0>, <WriteSubprocessPipeProto fd=0 pipe=<_UnixWritePipeTransport fd=8 idle bufsize=0>>)
DEBUG:asyncio:Read pipe 9 connected: (<_UnixReadPipeTransport fd=9 polling>, <ReadSubprocessPipeProto fd=1 pipe=<_UnixReadPipeTransport fd=9 polling>>) INFO:asyncio:run shell command '/local/p_check w_1.pcap --json': <_UnixSubprocessTransport pid=171289 running stdin=<_UnixWritePipeTransport fd=8 idle bufsize=0> stdout=<_UnixReadPipeTransport fd=9 polling>>
DEBUG:asyncio:<Process 171289> communicate: read stdout
INFO:asyncio:poll 4997.268 ms took 5003.093 ms: timeout
DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>
Nabz C
  • 538
  • 1
  • 9
  • 28

1 Answers1

12

loop.run_until_complete accepts something awaitable: coroutine or future. You pass result of function that returns nothing.

You should change get_coros() to actually return list of coros:

def get_coros():
    ...
    return coros

And cast that list to awaitable that executes jobs one-by-one (or parallely if you want). For example:

async def main():
    for coro in get_coros():
        await coro

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Upd:

I can't test my guess right now, but here it is: while asyncio.wait_for(fut, timeout=5) cancels task after 5 seconds, it doesn't terminate the process. You could do that manually:

try:
    await asyncio.wait_for(fut, timeout=5)
except asyncio.TimeoutError:
    p.kill()
    await p.communicate()
Neuron
  • 5,141
  • 5
  • 38
  • 59
Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • 1
    Updated the above code, I am getting the same error, just that when I loop in main() w/o **asyncio.as_completed** I get this _[ERROR] Task was destroyed but it is pending!_ – Nabz C Aug 01 '17 at 17:25
  • 1
    Perfect. Thanks. – Nabz C Aug 01 '17 at 21:08
  • 1
    In my code I use the timeout in asyncio.wait. It causes the wait function to _return_ after the timeout has elapsed, _not_ throw a timeout error. The pending jobs (which are returned by asyncio.wait) can be re-added again and will continue normally. "Cancelling" tasks is something different altogether. – Ytsen de Boer Nov 27 '18 at 20:28
  • 1
    @YtsendeBoer [documentation](https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for) clearly states: "If a timeout occurs, it cancels the task and raises asyncio.TimeoutError". You probably confused `asyncio.wait_for` with `asyncio.wait`. Last one [behaves](https://docs.python.org/3/library/asyncio-task.html#asyncio.wait) like you said, but irrelevant to this answer. – Mikhail Gerasimov Nov 27 '18 at 22:01
  • 1
    Well, I'm looking at my code right now and it is as I tell you :) Works like a charm. But you are right, besides the point, sorry. – Ytsen de Boer Nov 27 '18 at 22:19
  • Hey @NabzC, were you able to figure out how to get away with [ERROR] Task was destroyed but pending – Arnav Jul 28 '23 at 07:03