Working on a producer-consumer flow based on the asyncio.Queue
.
Codes below take reference from this answer and this blog.
import asyncio
async def produce(q: asyncio.Queue, t):
asyncio.create_task(q.put(t))
print(f'Produced {t}')
async def consume(q: asyncio.Queue):
while True:
res = await q.get()
if res > 2:
print(f'Cannot consume {res}')
raise ValueError(f'{res} too big')
print(f'Consumed {res}')
q.task_done()
async def shutdown(loop, signal=None):
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
print(f"Cancelling {len(tasks)} outstanding tasks")
[task.cancel() for task in tasks]
def handle_exception(loop, context):
msg = context.get("exception", context["message"])
print(f"Caught exception: {msg}")
asyncio.create_task(shutdown(loop))
async def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
[asyncio.create_task(consume(queue)) for _ in range(1)]
# consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]
try:
for i in range(6):
await asyncio.create_task(produce(queue, i))
await queue.join()
except asyncio.exceptions.CancelledError:
print('Cancelled')
asyncio.run(main())
When wrapping the consumers like above (without a naming list), the output is as expected:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Caught exception: 3 too big
Produced 4
Cancelling 2 outstanding tasks
Cancelled
But when giving the consumer list a name, which means change the code inside main()
like this:
async def main():
# <-- snip -->
# [asyncio.create_task(consume(queue)) for _ in range(1)]
consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]
# <-- snip -->
The program gets stuck like this:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Produced 5 # <- stuck here, have to manually stop by ^C
It seems like the producer
still keeps producing so that the items in the queue
keeps growing after the ValueError
raised. The handle_exception
never get called. And the program gets stuck at the await queue.join()
.
But why giving a name to the consumers list would change the behavior of the code? Why the handle_exception
never get called after consumers list being named?