To test a man-in-the-middle tcp proxy I have coded an echo tcp server and a tcp client. After each one of the tests I want the proxy and the server to go down, to make sure each test starts on a clean environment, so I have coded:
class TestConnections(TestCase):
loop = None
@classmethod
def setUpClass(cls) -> None:
TestConnections.loop = asyncio.new_event_loop()
asyncio.set_event_loop(TestConnections.loop)
def setUp(self) -> None:
EnergyAgent.GP = GrowattParser.GrowattParser()
self.server = self.loop.create_task(tcp_server(1235))
self.gp = self.loop.create_task(EnergyAgentProxy(1234, 1235))
def tearDown(self) -> None:
logger.debug('Cancelling the tasks')
self.server.cancel()
self.gp.cancel()
self.loop.run_until_complete(asyncio.sleep(0.5))
@classmethod
def tearDownClass(cls) -> None:
logger.debug('Closing the event loop')
TestConnections.loop.close()
def test_client2server(self):
# start the client process, and wait until done
result = self.loop.run_until_complete(asyncio.wait_for(tcp_client(1235, message), timeout=2))
self.assertEqual(message, result)
Now, the problem is: unless I add in the method tearDown the last line
self.loop.run_until_complete(asyncio.sleep(0.5))
I get an error about tasks on the proxy not having finished:
ERROR:asyncio:Task was destroyed but it is pending!
Is there any way to just for all the tasks to finish? I have tried running
self.loop.run_until_complete(asyncio.wait([self.server, self.gp]))
and asyncio.gather... unsuccessfully.