1

There is the following code:

import asyncio
import aiohttp

aut_token = ("token")

tasks = []
iter_flag = False

class WAPI:

    async def receiver(WAPI_S):
        async for msg in WAPI_S:
            data = msg.json()
            raise aiohttp.ClientError #test

    async def heartbeating(WAPI_S):
        while iter_flag:
            await WAPI_S.send_json({
                            "op": 1,
                            "d": None
                        })
            
            await asyncio.sleep(42.5)

    async def event_manager():
        loop = asyncio.get_running_loop()
        try:
            async with aiohttp.ClientSession().ws_connect("url") as WAPI_S: 
                task_receive = loop.create_task(WAPI.receiver(WAPI_S)); task_heartbeating = loop.create_task(WAPI.heartbeating(WAPI_S))
                tasks.append(task_receive); tasks.append(task_heartbeating)
                await asyncio.gather(*tasks)
        except aiohttp.ClientError:
            global iter_flag
            iter_flag = False
            await asyncio.sleep(44)
            [task.cancel() for task in tasks]
            try:
                loop.close()
            except:
                loop.stop()
                
            
asyncio.run(WAPI.event_manager())

I want to correctly shutdown the client when the exception is raised. My implementation throws "RuntimeError: Event loop stopped before Future completed" exception while executing. How to do it right?

Debianov
  • 25
  • 3

1 Answers1

0

In method event_manager, the statement:

            async with aiohttp.ClientSession().ws_connect("url") as WAPI_S:

needs to be replaced with:

            async with aiohttp.ClientSession() as session:
                async with session.ws_connect("url") as WAPI_S:

Also, it is considered anti-Pythonic to use a list comprehension for its side effects. See Is it Pythonic to use list comprehensions for just side effects? So you really should replace:

            [task.cancel() for task in tasks]

with:

            for task in tasks:
                task.cancel()

Putting this all together:

    async def event_manager():
        loop = asyncio.get_running_loop()
        try:
            async with aiohttp.ClientSession() as session:
                async with session.ws_connect("url") as WAPI_S: 
                    task_receive = loop.create_task(WAPI.receiver(WAPI_S)); task_heartbeating = loop.create_task(WAPI.heartbeating(WAPI_S))
                    tasks.append(task_receive); tasks.append(task_heartbeating)
                    await asyncio.gather(*tasks)
        except aiohttp.ClientError:
            global iter_flag
            iter_flag = False
            await asyncio.sleep(44)
            for task in tasks:
                task.cancel()
            try:
                loop.close()
            except:
                loop.stop()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Everything works. Thank you. However my question is, what is the technical difference between yours and mine's event_manager implementation? Could you explain? – Debianov Jul 26 '21 at 13:33
  • Each method, i.e. `aiohttp.ClientSession()` and `session.ws_connect()`, requires the `async` keyword, and so must be separate `async` statements. – Booboo Jul 26 '21 at 14:04