3

I'm trying to test an async code, but I'm having trouble because of the complex connection between some tasks.

The context I need this is some code which reads a file in parallel to it being written by another process. There's some logic in the code where reading a truncated record will make it back off and wait() on an asyncio.Condition to be later released by an inotify event. This code should let it recover by re-reading the record when a future write has been completed by another process. I specifically want to test that this recovery works.

So my plan would be:

  • write a partial file
  • run the event loop until it suspends on the condition
  • write the rest of the file
  • run the event loop to completion

I had thought this was the anser: Detect an idle asyncio event loop

However a trial test shows that it exits too soon:

import asyncio
import random


def test_ping_pong():
    async def ping_pong(idx: int, oth_idx: int):
        for i in range(random.randint(100, 1000)):
            counters[idx] += 1
            async with conditions[oth_idx]:
                conditions[oth_idx].notify()
            async with conditions[idx]:
                await conditions[idx].wait()

    async def detect_iowait():
        loop = asyncio.get_event_loop()
        rsock, wsock = socket.socketpair()
        wsock.close()
        try:
            await loop.sock_recv(rsock, 1)
        finally:
            rsock.close()
    conditions = [asyncio.Condition(), asyncio.Condition()]
    counters = [0, 0]


    loop = asyncio.get_event_loop()
    loop.create_task(ping_pong(0, 1))
    loop.create_task(ping_pong(1, 0))
    loop.run_until_complete(loop.create_task(detect_iowait()))

    assert counters[0] > 10
    assert counters[1] > 10
Philip Couling
  • 13,581
  • 5
  • 53
  • 85
  • Using asyncio on file ops isn't very successful/meaningful yet, depending on the OS. Maybe other classic forms of synchronisation would be better (e.g. [semaphores](https://docs.python.org/3/library/threading.html#semaphore-objects)) – Pynchia Sep 04 '20 at 02:02
  • I'm not using async io on the file ops. [Inotify](https://man7.org/linux/man-pages/man7/inotify.7.html) is a mechanism for the kernel to inform a process about changes to the file system. I'm using it to tell me when a file has been written to. Inotify is neatly packaged into a socket (or something that looks like one) so my code awaits on a socket which receives data direct from the kernel as file system events happen. The reads are regular `file.read()` which will be straight from the kernel disk cache in most cases because I'm reading the data that's just been written. – Philip Couling Sep 04 '20 at 07:00
  • After thinking about this some more, I think the only reasonable solution is to add some support to your readers to signal that they are blocked, e.g. by an optional event that can be passed to them. The unit-testing code can send such event and await it. At least that's how I'd approach the issue. – user4815162342 Sep 04 '20 at 07:41

1 Answers1

3

After digging through the source code for python's event loops, I've found nothing exposed that can do this publicly.

It is however possible to use the _ready deque created by the BaseEventLoop. See here. This contains every task that is immediately ready to run. When a task is run it is popped from the _ready deque. When a suspended task is released by another task (eg by calling future.set_result()) the suspended task is immediately added back to the deque. This has existed since python 3.5.

One thing that you can do is repeatedly inject a callback to check how many items in _ready. When all other tasks are suspended, there will be nothing left in the dqueue at the moment the callback runs.

The callback will run at most once per iteration of the event loop:

async def wait_for_deadlock(empty_loop_threshold: int = 0):
    def check_for_deadlock():
        nonlocal empty_loop_count

        # pylint: disable=protected-access
        if loop._ready:
            empty_loop_count = 0
            loop.call_soon(check_for_deadlock)
        elif empty_loop_count < empty_loop_threshold:
            empty_loop_count += 1
            loop.call_soon(check_for_deadlock)
        else:
            future.set_result(None)

    empty_loop_count = 0
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    asyncio.get_running_loop().call_soon(check_for_deadlock)
    await future

In the above code the empty_loop_threshold is not really necessary in most cases but exists for cases where tasks communicate with IO. For example if one task communicates to another through IO, there may be a moment where all tasks are suspended even through one has data ready to read. Setting empty_loop_threshold = 1 should get round this.

Using this is relatively simple. You can:

loop.run_until_complete(wait_for_deadlock())

Or as requested in my question:

def some_test():
    async def async_test():
        await wait_for_deadlock()
        inject_something()
        await wait_for_deadlock()
            

    loop = loop.get_event_loop()
    loop.create_task(task_to_test())
    loop.run_until_complete(loop.create_task(async_test)
    assert something
Philip Couling
  • 13,581
  • 5
  • 53
  • 85