1

Working on a producer-consumer flow based on the asyncio.Queue.
Codes below take reference from this answer and this blog.

import asyncio

async def produce(q: asyncio.Queue, t):
    asyncio.create_task(q.put(t))
    print(f'Produced {t}')

async def consume(q: asyncio.Queue):
    while True:
        res = await q.get()
        if res > 2:
            print(f'Cannot consume {res}')
            raise ValueError(f'{res} too big')
        print(f'Consumed {res}')
        q.task_done()

async def shutdown(loop, signal=None):
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    print(f"Cancelling {len(tasks)} outstanding tasks")
    [task.cancel() for task in tasks]

def handle_exception(loop, context):
    msg = context.get("exception", context["message"])
    print(f"Caught exception: {msg}")
    asyncio.create_task(shutdown(loop))

async def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

    [asyncio.create_task(consume(queue)) for _ in range(1)]
    # consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]

    try:
        for i in range(6):
            await asyncio.create_task(produce(queue, i))
        await queue.join()
    except asyncio.exceptions.CancelledError:
        print('Cancelled')


asyncio.run(main())

When wrapping the consumers like above (without a naming list), the output is as expected:

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Caught exception: 3 too big
Produced 4
Cancelling 2 outstanding tasks
Cancelled

But when giving the consumer list a name, which means change the code inside main() like this:

async def main():
    # <-- snip -->

    # [asyncio.create_task(consume(queue)) for _ in range(1)]
    consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]

    # <-- snip -->

The program gets stuck like this:

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Produced 5  # <- stuck here, have to manually stop by ^C

It seems like the producer still keeps producing so that the items in the queue keeps growing after the ValueError raised. The handle_exception never get called. And the program gets stuck at the await queue.join().

But why giving a name to the consumers list would change the behavior of the code? Why the handle_exception never get called after consumers list being named?

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
funkid
  • 577
  • 1
  • 10
  • 30

2 Answers2

2

TL;DR Don't use set_exception_handler to handle exception in tasks. Instead, add the requisite try: ... except: ... in the coroutine itself.

The problem is in the attempt to use set_exception_handler to handle exceptions. That function is a last-ditch attempt to detect an exception that has passed through all the way to the event loop, most likely as the result of a bug in the program. If a callback added by loop.call_soon or loop.call_at etc. raises an exception (and doesn't catch it), the handler installed by set_exception_handler will be consistently invoked.

With a task things are more nuanced: a task drives a coroutine to completion and, once done, stores its result, making it available to anyone who awaits the task, to callbacks installed by add_done_callback, but also to any call that invokes result() on the task. (All this is mandated by the contract of Future, which Task is a subclass of.) When the coroutine raises an unhandled exception, this exception is just another result: when someone awaits the task or invokes result(), the exception will be (re-)raised then and there.

This leads to the difference between naming and not naming the task objects. If you don't name them, they will be destroyed as soon as the event loop is done executing them. At the point of their destruction, Python will notice that no one has ever accessed their result and will pass it to the exception handler. On the other hand, if you store them in a variable, they won't be destroyed as long as they're referenced by the variable and there will be no reason to call the event loop handler: as far as Python is concerned, you might decide to call .result() on the objects at any point, access the exception and handle it as appropriate for your program.

To fix the issue, just handle the exception yourself by adding a try: ... except: ... block around the body of the coroutine. If you don't control the coroutine, you can use add_done_callback() to detect the exception instead.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
1

It's not about the named list. Your example can be simplified to:

asyncio.create_task(consume(queue))
# consumer = asyncio.create_task(consume(queue))

The point here is in the Task object that the function create_task returns. In one case, it is destroyed, but in the other not. Good answers have been given here and here

alex_noname
  • 26,459
  • 5
  • 69
  • 86
  • Thanks, but why I use a named list is because I actually would create more consumers like `for _ in range(10)` and I wanna cancel all the idle consumers after the `await queue.join()` like this: `for c in consumers: c.cancel()`. If the "the exception only gets raised if the Task is destroyed", does that mean that "cancel all tasks" and "handle exceptions if raise" cannot be done both? Since if no exception raised, one should cancel all these idling consumer tasks. – funkid Aug 07 '20 at 13:36
  • I think you need to redesign your code, and write coro that starts all consumers and waits it results with `asyncio.wait(..., return_when=FIRST_EXCEPTION)`. – alex_noname Aug 07 '20 at 14:16
  • 1
    I found that `await asyncio.wait([queue.join(), *consumers], return_when=asyncio.FIRST_EXCEPTION)` would stuck the program if no exception raised. The reason is simple: `consumers` never complete, and `FIRST_EXCEPTION` is equivalent to `ALL_COMPLETED` when no future raises an exception. It seems the only way to work around this is to: use `await asyncio.wait([queue.join(), *consumers], return_when=asyncio.FIRST_COMPLETED)`, and check the `done` part if there are any consumers in there like [this answer](https://stackoverflow.com/a/59629996). – funkid Aug 08 '20 at 02:21