3

In this simple producer/consumer example it is as if the await queue.put(item) does not release the event loop to allow the consumer to run until it finished. This results in the producer putting all it's items onto the queue and only then the consumer gets to take them off.

Is that expected?

I get the result that I am looking for if I follow the await queue.put(item) with await asyncio.sleep(0).

The producer then puts 1 item onto the queue and the consumers then takes 1 item off the queue.

I get the same result in Python 3.6.8 and 3.7.2.

import asyncio

async def produce(queue, n):
    for x in range(1, n + 1):
        print('producing {}/{}'.format(x, n))
        item = str(x)
        await queue.put(item)
        # await asyncio.sleep(0)
    await queue.put(None)

async def consume(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print('consuming item {}...'.format(item))

loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()
Paul O
  • 425
  • 5
  • 19

1 Answers1

5

This results in the producer putting all it's items onto the queue and only then the consumer gets to take them off. Is that expected?

Yes. The problem is that your queue is unbounded, so putting something into it never suspends the producer and thus never yields to other coroutines. The same goes for all awaits that immediately provide data, e.g. a read at EOF.

If the producer's loop contained another source of suspension, such as awaiting actual input (it has to get the items from somewhere, after all), then that would cause it to suspend and the problem wouldn't be immediately noticeable. A forced suspension using asyncio.sleep(0) works as well, but it's fragile because it relies on a single suspension to run the consumer. This might not always be the case, as the consumer could itself wait for some events other than the queue.

An unbounded queue makes sense in some situations, such as when the queue is pre-filled with tasks, or the architecture of the producer limits the number of items to a reasonable number. But if the queue items are generated dynamically, it is best to add a bound. The bound guarantees backppressure on the producer and ensures that it doesn't monopolize the event loop.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Thank you for the clear explanation. I had thought that await would always suspend the co-routine and give control back to the event loop but it seems that it the thing being awaited can immediate return it's result doesn't. Is that a proper generalization or is queue.put a special case? – Paul O Feb 09 '19 at 21:11
  • 2
    @PaulO Yes, that's the correct generalization - `await` doesn't guarantee yielding to the event loop, which you can easily check by awaiting a no-op coroutine in an infinite loop. (`asyncio.sleep(0)` is in fact [a special case](https://github.com/python/asyncio/issues/284) in the other direction.) The event loop will take over only when the coroutine is forced to suspend because the thing that is awaited cannot provide a result, e.g. when awaiting read from a socket that is not yet ready for reading, and so on. See [this answer](https://stackoverflow.com/a/48816319/1600898) for more details. – user4815162342 Feb 09 '19 at 22:07