5

I just discovered new features of Python 3.11 like ExceptionGroup and TaskGroup and I'm confused with the following TaskGroup behavior: if one or more tasks inside the group fails then all other normal tasks are cancelled and I have no chance to change that behavior Example:

async def f_error():
    raise ValueError()

async def f_normal(arg):
    print('starting', arg)
    await asyncio.sleep(1)
    print('ending', arg)


async with asyncio.TaskGroup() as tg:
    tg.create_task(f_normal(1))
    tg.create_task(f_normal(2))
    tg.create_task(f_error())

# starting 1
# starting 2
#----------
#< traceback of the error here >

In the example above I cannot make "ending 1" and "ending 2" to be printed. Meanwhile it will be very useful to have something like asyncio.gather(return_exceptions=True) option to do not cancel the remaining tasks when an error occurs.

You can say "just do not use TaskGroup if you do not want this cancellation behavior", but the answer is I want to use new exception groups feature and it's strictly bound to TaskGroup

So the questions are:

  1. May I somehow utilize exception groups in asyncio without this all-or-nothing cancellation policy in TaskGroup?
  2. If for the previous the answer is "NO": why python developers eliminated the possibility to disable cancellation in the TaskGroup API?
python_user
  • 5,375
  • 2
  • 13
  • 32
Anton M.
  • 185
  • 1
  • 11

3 Answers3

3

BaseExceptionGroups became part of standard Python in version 3.11. They are not bound to asyncio TaskGroup in any way. The documentation is here: https://docs.python.org/3/library/exceptions.html?highlight=exceptiongroup#ExceptionGroup.

Regarding your question 2, within the TaskGroup context you always have the option of creating a task using asyncio.create_task or loop.create_task. Such tasks will not be part of the TaskGroup and will not be cancelled when the TaskGroup closes. An exception in one of these tasks will not cause the group to close, provided the exception does not propagate into the group's __aexit__ method.

You also have the option of handling all errors within a Task. A Task that doesn't propagate an exception won't cancel the TaskGroup.

There's a good reason for enforcing Task cancellation when the group exits: the purpose of a group is to act as a self-contained collection of Tasks. It's contradictory to allow an uncancelled Task to continue after the group exits, potentially allowing tasks to leak out of the context.

Paul Cornelius
  • 9,245
  • 1
  • 15
  • 24
  • 1
    not OP, can you explain "the purpose of a group is to act as a self-contained collection of Tasks"?, if all tasks in a group are running concurrently, I would expect them to not be related, if they are related (result of one depends on other) why would I want them to run in a group, wouldn't I be running them sequentially? – python_user Jan 27 '23 at 05:32
  • People frequently run tasks in groups using asyncio.gather, but gather has the drawback that you can't add or remove tasks from the gather list once they are started. A TaskGroup lets you do that, while retaining gather's ability to await termination of the group as a whole. So it's an alternative to gather, as the docs for gather now state (python3.11). True, there are situations, like the one you describe, where the result of one task depends on another. In those cases neither gather nor TaskGroup would be the right tool. – Paul Cornelius Jan 27 '23 at 08:04
  • @python_user Not the person you asked, but I understand "TaskGroups act as self-contained collections of Tasks" to mean that essentially, you want to use a TaskGroup when you have several tasks that are related, and you want to proceed only after all those tasks are done. And although they are related (why else would you want to run them all at once?), they are *independent* of one another, so task A doesn't need anything from Task B, though they presumably contribute to some shared purpose, for example fetching individual files from a server that you need to style the webpage. – Nick Muise Feb 24 '23 at 16:44
  • @NickMuise Sort of, but I think you're being too rigid. TaskGroup just does what it does, which is to provide a context for running multiple tasks. It imposes a certain type of error handling, as documented, and won't exit with any of its tasks still running. That's it. There's no requirement that the tasks be fully independent of one another. The tasks can interact through queues, Events, Conditions, and so on, and that wouldn't bother TaskGroup one bit. I would say that if a TaskGroup is a convenient way of organizing your program, use it. – Paul Cornelius Feb 25 '23 at 01:32
1

As answered by Paul Cornelius, the TaskGroup class is carefully engineered to cancel itself and all its tasks at the moment when any task in it (registered with tg.create_task) raises an exception.

My understanding that a "forgiveful" task group, that would await for all other tasks upon it's context exit (end of async with block), regardless of ne or more tasks created in it erroring would still be useful, and that is the functionality you want.

I tinkered around the source code for the TaskGroup, and I think the minimal coding to get the forgiveful task group can be achieved by neutering its internal _abort method. This method is called on task exception handling, and all it does is loop through all tasks not yet done and cancel them. Tasks not cancelled would still be awaited at the end of the with block - and that is what we get by preventing _abort from running.

Keep in mind that as _abort starts with an underscore, it is an implementation detail, and the mechanisms for aborting might change inside TaskGroup even during Py 3.11 lifetime.

For now, I could get it working like this:

import asyncio

class ForgivingTaskGroup(asyncio.TaskGroup):
    _abort = lambda self: None

async def f_error():
    print("starting error")
    raise RuntimeError("booom")

async def f_normal(arg):
    print('starting', arg)
    await asyncio.sleep(.1)
    print('ending', arg)

async def main():
    async with ForgivingTaskGroup() as tg:
        tg.create_task(f_normal(1))
        tg.create_task(f_normal(2))
        tg.create_task(f_error())

        # await asyncio.sleep(0)

asyncio.run(main())

The stdout I got here is:

starting 1
starting 2
starting error
ending 1
ending 2

And stderr displayed the beautiful ASCII-art tree as by the book, but with a single exception as child.

jsbueno
  • 99,910
  • 10
  • 151
  • 209
1

As other answers have pointed out, TaskGroups don't currently come with any built-in mechanism like asyncio.gather()'s return_exceptions parameter, to prevent the TaskGroup from cancelling all its tasks when one of them raises an exception. A different answer gave a way to edit TaskGroup's internal _abort method to achieve the behavior you want, but if you don't feel comfortable touching Python's internals, you could alternatively rework your coroutines so that they don't propagate their exception until all the other tasks in the group are finished.

A limited and inflexible way to accomplish this would be to use the asyncio.Barrier class, which works like this:

A barrier is a simple synchronization primitive that allows to block until a certain number of tasks are waiting on it. Tasks can wait on the wait() method and would be blocked until the specified number of tasks end up waiting on wait(). At that point all of the waiting tasks would unblock simultaneously.

So, if you know ahead of time exactly how many tasks n you're going to add to your taskgroup, and as long as you don't explicitly cancel() any individual one of those tasks (only the entire taskgroup as a whole), and as long as you also don't pass your taskgroup into one of its tasks to dynamically add more tasks into it later, you can just create a barrier that blocks until n tasks are waiting on it, and use that barrier to force all of the tasks to return or raise their Exceptions at the same time. If you have n tasks, create the barrier as asyncio.Barrier(n), and ensure that all of your tasks eventually call await barrier.wait() - this will block them until all n of your tasks are waiting at the barrier. As soon as they're all there, the barrier will let them all proceed at once. Manually adding a barrier parameter to every function header and adding the same boilerplate to handle the delayed returns and raises to every coroutine would suck though, so instead we can use a decorator for that purpose:

import asyncio

def block_coro_until_barrier_passed(coro):
    """Ensure that the supplied coroutine doesn't return or raise any error
    until the supplied barrier allows it to proceed.
    """
    async def decorated_coro(   *args,
                                barrier:asyncio.Barrier,
                                **kwargs):
        runtime_error = None
        return_value = None
        try:
            return_value = await coro(*args, **kwargs)
        except Exception as e:
            runtime_error = e
        finally:
            await barrier.wait()
            if runtime_error is not None:
                raise runtime_error
            else:
                return return_value

    return decorated_coro

@block_coro_until_barrier_passed
async def f_error():
    raise ValueError()

@block_coro_until_barrier_passed
async def f_normal(arg):
    print('starting', arg)
    await asyncio.sleep(1)
    print('ending', arg)
    return arg

async def main():
    async with asyncio.TaskGroup() as tg:
        barrier = asyncio.Barrier(3)
        tg.create_task(f_normal(1, barrier=barrier))
        tg.create_task(f_normal(2, barrier=barrier))
        tg.create_task(f_error(barrier=barrier))
if __name__ == '__main__':
    asyncio.run(main())

# starting 1
# starting 2
# ending 1
# ending 2
# --------
# traceback for ExceptionGroup

This decorator basically creates a new coroutine that runs the coroutine you decorated, intercepts the return value or Exception that was raised, then either returns that return value or raises that Exception once it's able to pass the barrier (and it will pass the barrier only once all the other tasks have got their exception or return value ready and are now waiting at the barrier). So if you decorate all your coroutines with this decorator and also make sure that you configure the barrier for the correct number of tasks n, then when your taskgroup finally exits, all of the return values will be returned at once, and all of the exceptions raised will be propagated to the final ExceptionGroup (if applicable), and none of your tasks will be cancelled early due to another task raising an exception.

If you need to use this workaround for any real-world problem though, be very careful, as configuring the Barrier with too small of an n will lead to the taskgroup sometimes not allowing all tasks to complete if one raises an Exception, and too large of an n will lead to it hanging indefinitely. And if you cancel any of tasks in the taskgroup, this will cause the taskgroup to hang indefinitely on account of that task never getting to await barrier.wait(), or releasing its wait() if it's already there, meaning there will never be n tasks at the barrier for it to unblock. There may be a workaround to that last bit somewhere in the Barrier class, but I'm not sure.


As a final aside, I have no idea why something that accomplishes this more effectively isn't built into TaskGroup by default, because without some janky workaround like mine, TaskGroups can't fully replace gather(). There's also a bit of a gotcha with the ExceptionGroups that you'll see raised from most TaskGroups if you don't force in some workaround to prevent them from cancelling tasks as soon as one raises an Exception. The first time I read the documentation for TaskGroup, I left with the impression that the TaskGroup would neatly capture all the exceptions raised until all the tasks had completed, at which time it would raise an ExceptionGroup with all the exceptions it saw while it was running its tasks. But in reality, since TaskGroups cancel all other tasks as soon as one raises an exception, the only exceptions you'll see in that ExceptionGroup are the exceptions that are raised within that same exact iteration of the event loop after all the tasks are cancel()ed. So unless you actively try to coordinate your tasks to all raise their exceptions at the same time, you're almost always going to only see one or two exceptions in an ExceptionGroup at a time. I certainly didn't realize this at first, as I failed to note the nuances between a task being "cancelled" versus "finishing" when I first read the TaskGroup documentation:

The first time any of the tasks belonging to the group fails with an exception other than asyncio.CancelledError, the remaining tasks in the group are cancelled. [...]

Once all tasks have finished, if any tasks have failed with an exception other than asyncio.CancelledError, those exceptions are combined in an ExceptionGroup or BaseExceptionGroup (as appropriate; see their documentation) which is then raised.

Nick Muise
  • 303
  • 4
  • 10