1

I'm implementing small service using asyncio using a loop that is structured as follows:

pending = {...}
while True:
  done, pending = yield from asyncio.wait( 
    pending, 
    return_when=asyncio.FIRST_COMPLETED, 
  )
  for future in done:
    if future is x:
      # ...
    if future is y:
      # ...

This loop is currently controlling a sub-process, but having written a bunch of ZMQ-based services, this style feels very natural, so I'll likely be writing more of these in the near future.

I have something that runs just the way I like, but I'm kind of at a loss as to how I would write automated tests for this.

I would like to have my tests start this loop until it blocks on asyncio.wait() and inject one particular event to test the loop's handling of that particular event. That way, I can test each possible event handling and know I cover all cases as expected.

However, I can't find anything in asyncio that provides for this. If I simply yield from this coroutine, the test does not unblock until the coroutine completes.

Any ideas on how to test this particular kind of loop?


Edit: OK, so I managed to get something running using a socketpair() by patching sys.stdout with the write end and wrapping the other end in a StreamReader.

This works when I run pytest without capture, but as soon as remove the -s argument, the test seems to deadlock.

Any ideas?

André Caron
  • 44,541
  • 12
  • 67
  • 125
  • 2
    See http://stackoverflow.com/questions/23033939/how-to-test-python-3-4-asyncio-code/ – Andrew Svetlov Dec 07 '15 at 07:17
  • 2
    @AndrewSvetlov Thanks, but none of the answers there are helpful. Currently, this code is running a sub-process and does not have a network interface, so my test cannot simply have another coroutine that exchanges messages with the coroutine to be tested. I also cannot use `.run_until_complete()` because I want to test intermediate states... – André Caron Dec 07 '15 at 16:27

0 Answers0