12

I'm trying to resolve this error: RuntimeError: Cannot close a running event loop in my asyncio process. I believe it's happening because there's a failure while tasks are still pending, and then I try to close the event loop. I'm thinking I need to await the remaining responses prior to closing the event loop, but I'm not sure how to accomplish that correctly in my specific situation.

 def start_job(self):

        if self.auth_expire_timestamp < get_timestamp():
            api_obj = api_handler.Api('Api Name', self.dbObj)
            self.api_auth_resp = api_obj.get_auth_response()
            self.api_attr = api_obj.get_attributes()


        try:
            self.queue_manager(self.do_stuff(json_data))
        except aiohttp.ServerDisconnectedError as e:
            logging.info("Reconnecting...")
            api_obj = api_handler.Api('API Name', self.dbObj)
            self.api_auth_resp = api_obj.get_auth_response()
            self.api_attr = api_obj.get_attributes()
            self.run_eligibility()

async def do_stuff(self, data):

    tasks = []

    async with aiohttp.ClientSession() as session:
        for row in data:
            task = asyncio.ensure_future(self.async_post('url', session, row))
            tasks.append(task)
        result = await asyncio.gather(*tasks)
    self.load_results(result)


def queue_manager(self, method):
    self.loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(method)
    self.loop.run_until_complete(future)


async def async_post(self, resource, session, data):
        async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
            resp = []
            try:
                headers = response.headers['foo']
                content = await response.read()
                resp.append(headers)
                resp.append(content)
            except KeyError as e:
                logging.error('KeyError at async_post response')
                logging.error(e)
        return resp


def shutdown(self):
    //need to do something here to await the remaining tasks and then I need to re-start a new event loop, which i think i can do, just don't know how to appropriately stop the current one.
    self.loop.close() 
    return True

How can I handle the error and properly close the event loop so I can start a new one and essentially re-boot the whole program and continue on.

EDIT:

This is what I'm trying now, based on this SO answer. Unfortunately, this error only happens rarely, so unless I can force it, i will have to wait and see if it works. In my queue_manager method I changed it to this:

try:
    self.loop.run_until_complete(future)
except Exception as e:
    future.cancel()
    self.loop.run_until_complete(future)
    future.exception()

UPDATE:

I got rid of the shutdown() method and added this to my queue_manager() method instead and it seems to be working without issue:

try:
    self.loop.run_until_complete(future)
except Exception as e:
    future.cancel()
    self.check_in_records()
    self.reconnect()
    self.start_job()
    future.exception()
Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
hyphen
  • 2,368
  • 5
  • 28
  • 59
  • Where is `shutdown` called from, and why do you try to `close` the event loop? An asyncio program is normally served by a single event loop instance throughout its lifetime. – user4815162342 Aug 17 '18 at 09:06
  • the problem was that the API I was calling would be disconnected with tasks pending and I was trying to "reboot" without the whole app crashing. I added an update for what I've most recently added. It seems to be working, but i'm open to feedback. – hyphen Aug 17 '18 at 10:32
  • Do you need `future.exception()` at the end? It seems like `run_until_complete` is correctly picking up the exception. Also, what is the point of canceling a future that is obviously already complete (as witnessed by `run_until_complete` raising)? – user4815162342 Aug 17 '18 at 10:54
  • This answer - https://stackoverflow.com/a/30766124/4113027 seemed to indicate I needed the `future.exception()` bit. As far as your other question, I was canceling the future because in this instance there are remaining tasks pending and i want to essentially remove those tasks and start a fresh event loop. In this scenario the server has disconnected before all the tasks complete so those remaining tasks aren't going to bring back any data anyway...maybe I'm not handling that correctly..? – hyphen Aug 17 '18 at 14:54
  • That answer is specifically about handling `KeyboardInterrupt` which is special in asyncio - it's the only exception that can propagate out of `run_until_complete` without the future having completed. Handling Ctrl-C correctly in asyncio is very hard or even impossible (see [here](https://vorpus.org/blog/control-c-handling-in-python-and-trio/) for the gory details), but fortunately your code isn't concerned with that, it only cares about exceptions raised by the future. (Note that `KeyboardInterrupt` doesn't inherit from `Exception`, so in case of Ctrl-C the `except` body won't even execute.) – user4815162342 Aug 17 '18 at 17:36
  • A future can be in several states, but roughly it either has a result, or it doesn't. The result can be a value, which will be returned by `result()`, or an exception, which will be raised when `result()` is called. `run_until_complete()` returns the result or raises the exception, as appropriate. `cancel()` on the other hand injects an `CancelledError` exception into a running future, causing it to exit. This is why it makes no sense to cancel a future that is already finished - you cannot resume it even for long enough to inject an exception into it, it's simply done. – user4815162342 Aug 17 '18 at 17:41
  • 1
    In your code, both `cancel()` and `exception()` are unnecessary. `cancel()` because the future is already finished (with an exception) as evidenced by the fact that `run_until_complete` exited. See [this code](https://pastebin.com/spBbviT4) for a functionally equivalent example which runs without warning. – user4815162342 Aug 17 '18 at 17:42
  • @user4815162342 - thank you, this is extremely helpful...still wrapping my mind around aysncio.. – hyphen Aug 18 '18 at 09:13

2 Answers2

4

To answer the question as originally stated, there is no need to close() a running loop, you can reuse the same loop for the whole program.

Given the code in the update, your queue_manager could look like this:

try:
    self.loop.run_until_complete(future)
except Exception as e:
    self.check_in_records()
    self.reconnect()
    self.start_job()

Cancelling future is not necessary and as far as I can tell has no effect. This is different from the referenced answer which specifically reacts to KeyboardInterrupt, special because it is raised by asyncio itself. KeyboardInterrupt can be propagated by run_until_complete without the future having actually completed. Handling Ctrl-C correctly in asyncio is very hard or even impossible (see here for details), but fortunately the question is not about Ctrl-C at all, it is about exceptions raised by the coroutine. (Note that KeyboardInterrupt doesn't inherit from Exception, so in case of Ctrl-C the except body won't even execute.)

I was canceling the future because in this instance there are remaining tasks pending and i want to essentially remove those tasks and start a fresh event loop.

This is a correct thing to want to do, but the code in the (updated) question is only canceling a single future, the one already passed to run_until_complete. Recall that a future is a placeholder for a result value that will be provided at a later point. Once the value is provided, it can be retrieved by calling future.result(). If the "value" of the future is an exception, future.result() will raise that exception. run_until_complete has the contract that it will run the event loop for as long as it takes for the given future to produce a value, and then it returns that value. If the "value" is in fact an exception to raise, then run_until_complete will re-raise it. For example:

loop = asyncio.get_event_loop()
fut = loop.create_future()
loop.call_soon(fut.set_exception, ZeroDivisionError)
# raises ZeroDivisionError, as that is the future's result,
# manually set
loop.run_until_complete(fut)

When the future in question is in fact a Task, an asyncio-specific object that wraps a coroutine into a Future, the result of such future is the object returned by the coroutine. If the coroutine raises an exception, then retrieving the result will re-raise it, and so will run_until_complete:

async def fail():
    1/0

loop = asyncio.get_event_loop()
fut = loop.create_task(fail())
# raises ZeroDivisionError, as that is the future's result,
# because the coroutine raises it
loop.run_until_complete(fut)

When dealing with a task, run_until_complete finishing means that the coroutine has finished as well, having either returned a value or raised an exception, as determined by run_until_complete returning or raising.

On the other hand, cancelling a task works by arranging for the task to be resumed and the await expression that suspended it to raise CancelledError. Unless the task specifically catches and suppresses this exception (which well-behaved asyncio code is not supposed to do), the task will stop executing and the CancelledError will become its result. However, if the coroutine is already finished when cancel() is called, then cancel() cannot do anything because there is no pending await to inject CancelledError into.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
0

I got the same error below:

RuntimeError: Cannot close a running event loop

When I called loop.close() in test() as shown below:

import asyncio

async def test(loop):
    print("Test")
    loop.stop()
    loop.close() # Here

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

loop.create_task(test(loop))

loop.run_forever()

So, I used loop.close() after loop.run_forever() with try: and finally: as shown below, then the error was solved:

import asyncio

async def test(loop):
    print("Test")
    loop.stop()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

loop.create_task(test(loop))

try:
    loop.run_forever()
finally:
    loop.close() # Here
Super Kai - Kazuya Ito
  • 22,221
  • 10
  • 124
  • 129