16

I have an array of tasks and I am awaiting them with Task.WhenAll. My tasks are failing frequently, in which case I inform the user with a message box so that she can try again. My problem is that reporting the error is delayed until all tasks are completed. Instead I would like to inform the user as soon as the first task has thrown an exception. In other words I want a version of Task.WhenAll that fails fast. Since no such build-in method exists I tried to make my own, but my implementation does not behave the way I want. Here is what I came up with:

public static async Task<TResult[]> WhenAllFailFast<TResult>(
    params Task<TResult>[] tasks)
{
    foreach (var task in tasks)
    {
        await task.ConfigureAwait(false);
    }
    return await Task.WhenAll(tasks).ConfigureAwait(false);
}

This generally throws faster than the native Task.WhenAll, but usually not fast enough. A faulted task #2 will not be observed before the completion of task #1. How can I improve it so that it fails as fast as possible?


Update: Regarding cancellation, it is not in my requirements right now, but lets say that for consistency the first cancelled task should stop the awaiting immediately. In this case the combining task returned from WhenAllFailFast should have Status == TaskStatus.Canceled.

Clarification: Τhe cancellation scenario is about the user clicking a Cancel button to stop the tasks from completing. It is not about cancelling automatically the incomplete tasks in case of an exception.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Why not use a combination of "WhenAll" and "WhenException" continuiation, with the Exception one cancelling all other tasks? Task[T] does carry fields to notice that they were canceled so it would be easily to have the "WhenAll" realize it should do nothing. I doubt you can get aroun creating your own Waiting Mechanism for this. – Christopher Aug 01 '19 at 16:34
  • 1
    WhenAny or WaitAny might be a good candidate. It executes when any task finishes. And from there you can decided to either re-queue "WhenAny", continue witht the WhenAll code or cancel all on exception. | Edit: Looks like stannius had the code for that before I finished writin the idea. – Christopher Aug 01 '19 at 16:39
  • @Christopher where is this "\"WhenException\" continuiation"? – stannius Aug 01 '19 at 16:42
  • @stannius: I asumed there would be one. But "WhenAny" with a code checking for Exception works just as well. – Christopher Aug 01 '19 at 16:57
  • 2
    What do you want to happen with the other tasks when one fails? You want the other task to continue their work or do you want to abort other tasks? – Peter Bons Aug 01 '19 at 17:27
  • 1
    @PeterBons ideally I would like to cancel the other tasks, to preserve resources of the machine, but letting them run to completion is not a big deal. My tasks are neither CPU nor IO intensive, so I hope that starting a new set of tasks while some of the old ones are still running will not cause much of a problem. – Theodor Zoulias Aug 01 '19 at 17:51
  • 1
    Do the tasks accept a cancellation token or can they be modified to accept one (and respect it)? – Peter Bons Aug 01 '19 at 18:03
  • 1
    Possible duplicate of [Writing a Task.WhenAll/WhenAny variant that cancels all other tasks on first faulted/Cancelled task](https://stackoverflow.com/questions/22399063/writing-a-task-whenall-whenany-variant-that-cancels-all-other-tasks-on-first-fau) – JSteward Aug 01 '19 at 18:08
  • 1
    @PeterBons currently my tasks does not support cancellation. I am using a library with async methods that do not accept a `CancellationToken`, so in reality I have no way of canceling them. – Theodor Zoulias Aug 01 '19 at 18:11
  • 1
    @JSteward the [duplicate](https://stackoverflow.com/questions/22399063/writing-a-task-whenall-whenany-variant-that-cancels-all-other-tasks-on-first-fau) contains a complicated recursive method `WhenAllError` that requires a `CancellationToken` as an argument, and doesn't compile. Removing the code related to the `CancellationToken` causes the method to fall into an infinite loop. It is certainly not a solution to my problem. – Theodor Zoulias Aug 01 '19 at 18:37

4 Answers4

9

Your best bet is to build your WhenAllFailFast method using TaskCompletionSource. You can .ContinueWith() every input task with a synchronous continuation that errors the TCS when the tasks end in the Faulted state (using the same exception object).

Perhaps something like (not fully tested):

using System;
using System.Threading;
using System.Threading.Tasks;

namespace stackoverflow
{
    class Program
    {
        static async Task Main(string[] args)
        {

            var cts = new CancellationTokenSource();
            cts.Cancel();
            var arr = await WhenAllFastFail(
                Task.FromResult(42),
                Task.Delay(2000).ContinueWith<int>(t => throw new Exception("ouch")),
                Task.FromCanceled<int>(cts.Token));

            Console.WriteLine("Hello World!");
        }

        public static Task<TResult[]> WhenAllFastFail<TResult>(params Task<TResult>[] tasks)
        {
            if (tasks is null || tasks.Length == 0) return Task.FromResult(Array.Empty<TResult>());

            // defensive copy.
            var defensive = tasks.Clone() as Task<TResult>[];

            var tcs = new TaskCompletionSource<TResult[]>();
            var remaining = defensive.Length;

            Action<Task> check = t =>
            {
                switch (t.Status)
                {
                    case TaskStatus.Faulted:
                        // we 'try' as some other task may beat us to the punch.
                        tcs.TrySetException(t.Exception.InnerException);
                        break;
                    case TaskStatus.Canceled:
                        // we 'try' as some other task may beat us to the punch.
                        tcs.TrySetCanceled();
                        break;
                    default:

                        // we can safely set here as no other task remains to run.
                        if (Interlocked.Decrement(ref remaining) == 0)
                        {
                            // get the results into an array.
                            var results = new TResult[defensive.Length];
                            for (var i = 0; i < tasks.Length; ++i) results[i] = defensive[i].Result;
                            tcs.SetResult(results);
                        }
                        break;
                }
            };

            foreach (var task in defensive)
            {
                task.ContinueWith(check, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
            }

            return tcs.Task;
        }
    }
}

Edit: Unwraps AggregateException, Cancellation support, return array of results. Defend against array mutation, null and empty. Explicit TaskScheduler.

ZaldronGG
  • 924
  • 8
  • 13
  • I updated my question with requirements regarding cancellation. In brief it follows the fail-fast principle. – Theodor Zoulias Aug 01 '19 at 17:37
  • This works pretty good, but has two problems: 1) On failure I get an `AggregateException`. I would prefer to get a specific exception directly, like I do when I await the `Task.WhenAll`. 2) It does not support returning the results of the completed tasks. Occasionally all my tasks are completing successfully, and then I do need the results to process them! – Theodor Zoulias Aug 01 '19 at 17:43
  • 2
    I recommend using `await` for continuations, e.g., with a local `async` method. If you do use `ContinueWith`, you [should pass a `TaskScheduler`](https://blog.stephencleary.com/2013/10/continuewith-is-dangerous-too.html). – Stephen Cleary Aug 01 '19 at 22:35
  • Btw I think that the option `ExecuteSynchronously` makes the passing of a `TaskScheduler` practically redundant. With the exception of [some rare cases](https://devblogs.microsoft.com/pfxteam/when-executesynchronously-doesnt-execute-synchronously/), a continuation with this option will not be scheduled at all. – Theodor Zoulias Aug 01 '19 at 23:55
  • @TheodorZoulias: `ContinueWith` is a low-level method with dangerous defaults. In this case, its default task scheduler is *not* `TaskScheduler.Default` - it's whatever ambient task scheduler is current when this code runs. `ExecuteSynchronously` is only a hint, and [is ignored if the task schedulers are incompatible](https://devblogs.microsoft.com/pfxteam/when-executesynchronously-doesnt-execute-synchronously/). – Stephen Cleary Aug 02 '19 at 01:39
  • @StephenCleary to be honest I am not convinced that the defaults of `ContinueWith` and `StartNew` are so dangerous. [The scenario](https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html) that something unexpected could happen seems too far fetched to me, and even in this case it should be unexpected only for someone who don't know well enough the tools he is using. Using the ambient task scheduler may well be the desired behavior. If a non-default scheduler is used for the tasks (an uncommon scenario per se), probably should be used for the continuations too. – Theodor Zoulias Aug 02 '19 at 02:18
  • I have two very good answers to choose from. I decided to accept this one because it has more votes, and also because it is much more performant with huge arrays of tasks. It can handle 300,000 tasks per second, while the [other answer](https://stackoverflow.com/a/57313482/11178549) handles "only" 3,000 tasks per second. – Theodor Zoulias Aug 02 '19 at 06:27
8

I recently needed once again the WhenAllFailFast method, and I revised @ZaldronGG's excellent solution to make it a bit more performant (and more in line with Stephen Cleary's recommendations). The implementation below handles around 3,500,000 tasks per second in my PC.

public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks)
{
    if (tasks is null) throw new ArgumentNullException(nameof(tasks));
    if (tasks.Length == 0) return Task.FromResult(new TResult[0]);

    var results = new TResult[tasks.Length];
    var remaining = tasks.Length;
    var tcs = new TaskCompletionSource<TResult[]>(
        TaskCreationOptions.RunContinuationsAsynchronously);

    for (int i = 0; i < tasks.Length; i++)
    {
        var task = tasks[i];
        if (task == null) throw new ArgumentException(
            $"The {nameof(tasks)} argument included a null value.", nameof(tasks));
        HandleCompletion(task, i);
    }
    return tcs.Task;

    async void HandleCompletion(Task<TResult> task, int index)
    {
        try
        {
            var result = await task.ConfigureAwait(false);
            results[index] = result;
            if (Interlocked.Decrement(ref remaining) == 0)
            {
                tcs.TrySetResult(results);
            }
        }
        catch (OperationCanceledException)
        {
            tcs.TrySetCanceled();
        }
        catch (Exception ex)
        {
            tcs.TrySetException(ex);
        }
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
2

Your loop waits for each of the tasks in pseudo-serial, so that's why it waits for task1 to complete before checking if task2 failed.

You might find this article helpful on a pattern for aborting after the first failure: http://gigi.nullneuron.net/gigilabs/patterns-for-asynchronous-composite-tasks-in-c/

    public static async Task<TResult[]> WhenAllFailFast<TResult>(
        params Task<TResult>[] tasks)
    {
        var taskList = tasks.ToList();
        while (taskList.Count > 0)
        {
            var task = await Task.WhenAny(taskList).ConfigureAwait(false);
            if(task.Exception != null)
            {
                // Left as an exercise for the reader: 
                // properly unwrap the AggregateException; 
                // handle the exception(s);
                // cancel the other running tasks.
                throw task.Exception.InnerException;           
            }

            taskList.Remove(task);
        }
        return await Task.WhenAll(tasks).ConfigureAwait(false);
     }
stannius
  • 1,260
  • 3
  • 16
  • 33
  • I replaced my code inside `WhenAllFailFast` with your code, and doesn't compile. Can you provide a complete implementation? I think that your approach is quite promising! – Theodor Zoulias Aug 01 '19 at 16:49
  • I just tested this. Not working. Your `WhenAllFailFast` method waits until all tasks are completed, like the native `Task.WhenAll`. – Theodor Zoulias Aug 01 '19 at 17:12
  • @TheodorZoulias OK, I added a couple lines where you would need to check for the exception on a single task. – stannius Aug 01 '19 at 17:46
  • Now it works great! It doesn't handle well my update requirements regarding cancellation, but it's not a big deal because currently I don't cancel my tasks. – Theodor Zoulias Aug 01 '19 at 17:57
  • To cancel the other tasks, they all have to accept and honor a cancellation token. It's non-trivial (I've done it) and it sounds like your requirement regarding cancellation is somewhat soft. But I added another couple lines to show where you would do it. – stannius Aug 01 '19 at 18:10
1

I'm adding one more answer to this problem, not because I've found a faster solution, but because I am now a bit skeptical about starting multiple async void operations on an unknown SynchronizationContext. The solution I am proposing here is significantly slower. It's about 3 times slower than @ZaldronGG's excellent solution, and about 10 times slower than my previous async void-based implementation. It has though the advantage that after the completion of the returned Task<TResult[]>, it doesn't leak fire-and-forget continuations attached on the observed tasks. When this task is completed, all the continuations created internally by the WhenAllFailFast method have been cleaned up. Which is a desirable behavior for APIs is general, but in many scenarios it might not be important.

public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks)
{
    ArgumentNullException.ThrowIfNull(tasks);
    CancellationTokenSource cts = new();
    Task<TResult> failedTask = null;
    TaskContinuationOptions flags = TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously;
    Action<Task<TResult>> continuationAction = new(task =>
    {
        if (!task.IsCompletedSuccessfully)
            if (Interlocked.CompareExchange(ref failedTask, task, null) is null)
                cts.Cancel();
    });
    IEnumerable<Task> continuations = tasks.Select(task => task
        .ContinueWith(continuationAction, cts.Token, flags, TaskScheduler.Default));

    return Task.WhenAll(continuations).ContinueWith(allContinuations =>
    {
        cts.Dispose();
        Task<TResult> localFailedTask = Volatile.Read(ref failedTask);
        if (localFailedTask is not null)
            return Task.WhenAll(localFailedTask);
        // At this point all the tasks are completed successfully
        Debug.Assert(tasks.All(t => t.IsCompletedSuccessfully));
        Debug.Assert(allContinuations.IsCompletedSuccessfully);
        return Task.WhenAll(tasks);
    }, default, flags, TaskScheduler.Default).Unwrap();
}

This implementation is similar to ZaldronGG's in that it attaches one continuation on each task, with the difference being that these continuations are cancelable, and they are canceled en masse when the first non-successful task is observed. It also uses the Unwrap technique that I've discovered recently, which eliminates the need for the manual completion of a TaskCompletionSource<TResult[]> instance, and usually makes for a concise implementation.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104