I'm trying to test an async code, but I'm having trouble because of the complex connection between some tasks.
The context I need this is some code which reads a file in parallel to it being written by another process. There's some logic in the code where reading a truncated record will make it back off and wait()
on an asyncio.Condition
to be later released by an inotify event. This code should let it recover by re-reading the record when a future write has been completed by another process. I specifically want to test that this recovery works.
So my plan would be:
- write a partial file
- run the event loop until it suspends on the condition
- write the rest of the file
- run the event loop to completion
I had thought this was the anser: Detect an idle asyncio event loop
However a trial test shows that it exits too soon:
import asyncio
import random
def test_ping_pong():
async def ping_pong(idx: int, oth_idx: int):
for i in range(random.randint(100, 1000)):
counters[idx] += 1
async with conditions[oth_idx]:
conditions[oth_idx].notify()
async with conditions[idx]:
await conditions[idx].wait()
async def detect_iowait():
loop = asyncio.get_event_loop()
rsock, wsock = socket.socketpair()
wsock.close()
try:
await loop.sock_recv(rsock, 1)
finally:
rsock.close()
conditions = [asyncio.Condition(), asyncio.Condition()]
counters = [0, 0]
loop = asyncio.get_event_loop()
loop.create_task(ping_pong(0, 1))
loop.create_task(ping_pong(1, 0))
loop.run_until_complete(loop.create_task(detect_iowait()))
assert counters[0] > 10
assert counters[1] > 10