5

I want to execute several asynchronous tasks concurrently. Each task will run an HTTP request that can either complete successfully or throw an exception. I need to await until the first task completes successfully, or until all the tasks have failed.

How can I implement an overload of the Task.WhenAny method that accepts a predicate, so that I can exclude the non-successfully completed tasks?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
T. Gilad
  • 87
  • 1
  • 3

6 Answers6

10

Wait for any task and return the task if the condition is met. Otherwise wait again for the other tasks until there is no more task to wait for.

public static async Task<Task> WhenAny( IEnumerable<Task> tasks, Predicate<Task> condition )
{
    var tasklist = tasks.ToList();
    while ( tasklist.Count > 0 )
    {
        var task = await Task.WhenAny( tasklist );
        if ( condition( task ) )
            return task;
        tasklist.Remove( task );
    }
    return null;
}

simple check for that

var tasks = new List<Task> {
    Task.FromException( new Exception() ),
    Task.FromException( new Exception() ),
    Task.FromException( new Exception() ),
    Task.CompletedTask, };

var completedTask = WhenAny( tasks, t => t.Status == TaskStatus.RanToCompletion ).Result;

if ( tasks.IndexOf( completedTask ) != 3 )
    throw new Exception( "not expected" );
Sir Rufo
  • 18,395
  • 2
  • 39
  • 73
3
public static Task<Task<T>> WhenFirst<T>(IEnumerable<Task<T>> tasks, Func<Task<T>, bool> predicate)
{
    if (tasks == null) throw new ArgumentNullException(nameof(tasks));
    if (predicate == null) throw new ArgumentNullException(nameof(predicate));

    var tasksArray = (tasks as IReadOnlyList<Task<T>>) ?? tasks.ToArray();
    if (tasksArray.Count == 0) throw new ArgumentException("Empty task list", nameof(tasks));
    if (tasksArray.Any(t => t == null)) throw new ArgumentException("Tasks contains a null reference", nameof(tasks));

    var tcs = new TaskCompletionSource<Task<T>>();
    var count = tasksArray.Count;

    Action<Task<T>> continuation = t =>
        {
            if (predicate(t))
            {
                tcs.TrySetResult(t);
            }
            if (Interlocked.Decrement(ref count) == 0)
            {
                tcs.TrySetResult(null);
            }
        };

    foreach (var task in tasksArray)
    {
        task.ContinueWith(continuation);
    }

    return tcs.Task;
}

Sample usage:

var task = await WhenFirst(tasks, t => t.Status == TaskStatus.RanToCompletion);

if (task != null)
    var value = await task;

Note that this doesn't propagate exceptions of failed tasks (just as WhenAny doesn't).

You can also create a version of this for the non-generic Task.

Eli Arbel
  • 22,391
  • 3
  • 45
  • 71
3
public static Task<T> GetFirstResult<T>(
    ICollection<Func<CancellationToken, Task<T>>> taskFactories, 
    Predicate<T> predicate) where T : class
{
    var tcs = new TaskCompletionSource<T>();
    var cts = new CancellationTokenSource();

    int completedCount = 0;
    // in case you have a lot of tasks you might need to throttle them 
    //(e.g. so you don't try to send 99999999 requests at the same time)
    // see: http://stackoverflow.com/a/25877042/67824
    foreach (var taskFactory in taskFactories)
    {
        taskFactory(cts.Token).ContinueWith(t => 
        {
            if (t.Exception != null)
            {
                Console.WriteLine($"Task completed with exception: {t.Exception}");
            }
            else if (predicate(t.Result))
            {
                cts.Cancel();
                tcs.TrySetResult(t.Result);
            }

            if (Interlocked.Increment(ref completedCount) == taskFactories.Count)
            {
                tcs.SetException(new InvalidOperationException("All tasks failed"));
            }

        }, cts.Token);
    }

    return tcs.Task;
}

Sample usage:

using System.Net.Http;
var client = new HttpClient();
var response = await GetFirstResult(
    new Func<CancellationToken, Task<HttpResponseMessage>>[] 
    {
        ct => client.GetAsync("http://microsoft123456.com", ct),
        ct => client.GetAsync("http://microsoft123456.com", ct),
        ct => client.GetAsync("http://microsoft123456.com", ct),
        ct => client.GetAsync("http://microsoft123456.com", ct),
        ct => client.GetAsync("http://microsoft123456.com", ct),
        ct => client.GetAsync("http://microsoft123456.com", ct),
        ct => client.GetAsync("http://microsoft123456.com", ct),
        ct => client.GetAsync("http://microsoft.com", ct),
        ct => client.GetAsync("http://microsoft123456.com", ct),
        ct => client.GetAsync("http://microsoft123456.com", ct),
    }, 
    rm => rm.IsSuccessStatusCode);
Console.WriteLine($"Successful response: {response}");
Ohad Schneider
  • 36,600
  • 15
  • 168
  • 198
1

Here is a more sophisticated version of Ohad Schneider's GetFirstResult implementation, with similar API as in Eli Arbel's WhenFirst method. The idea is the same: attach a cancelable continuation on each task, and cancel the CancellationTokenSource when a completed task satisfies the predicate. This implementation avoids using a TaskCompletionSource<T>, and so avoids the risk of the asynchronous WhenFirst method never completing because of a bug in the implementation:

public static async Task<Task<TResult>> WhenFirst<TResult>(
    Task<TResult>[] tasks,
    Func<Task<TResult>, bool> predicate)
{
    ArgumentNullException.ThrowIfNull(tasks);
    ArgumentNullException.ThrowIfNull(predicate);

    using CancellationTokenSource cts = new();
    Task<TResult> selectedTask = null;
    IEnumerable<Task> continuations = tasks
        .Where(task => task is not null)
        .TakeWhile(_ => !cts.IsCancellationRequested)
        .Select(task => task.ContinueWith(t =>
        {
            bool result;
            try { result = predicate(t); } catch { cts.Cancel(); throw; }
            if (result)
                if (Interlocked.CompareExchange(ref selectedTask, t, null) is null)
                    cts.Cancel();
        }, cts.Token, TaskContinuationOptions.DenyChildAttach |
            TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default));

    Task whenAll = Task.WhenAll(continuations);
    try { await whenAll.ConfigureAwait(false); }
    catch when (whenAll.IsCanceled) { } // Ignore
    return selectedTask;
}

The selling points of this implementation are:

  1. When the WhenFirst method completes asynchronously, all the work is done and everything is cleaned up. No continuations are left behind as fire-and-forget tasks, attached on the original tasks.
  2. An exception in the predicate is propagated as a fault of the returned Task<Task<TResult>>.
  3. The predicate is not called after a task has been selected as the result.
  4. In case one of the tasks is already completed and satisfies the predicate, continuations are not attached on the remaining tasks (performance optimization).
  5. In case none of the tasks satisfies the predicate, the asynchronous result of the method is a null task. In other words the WhenFirst().Result is null.
  6. The method tolerates null tasks in the input tasks array. It doesn't throw an exception if it finds a null one.

The reason for the signature having as parameter a Task<T>[] array instead of an IEnumerable<Task<T>> sequence, is because I didn't want to deal with the possibility of an error during the enumeration of the sequence. Passing a deferred sequence as argument, in other words relying on the WhenFirst() call to instantiate the tasks, sounds like an improbable scenario to me anyway.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0

Another way of doing this, very similar to Sir Rufo's answer, but using AsyncEnumerable and Ix.NET

Implement a little helper method to stream any task as soon as it's completed:

static IAsyncEnumerable<Task<T>> WhenCompleted<T>(IEnumerable<Task<T>> source) =>
    AsyncEnumerable.Create(_ =>
    {
        var tasks = source.ToList();
        Task<T> current = null;
        return AsyncEnumerator.Create(
            async () => tasks.Any() && tasks.Remove(current = await Task.WhenAny(tasks)), 
            () => current,
            async () => { });
    });
}

One can then process the tasks in completion order, e.g. returning the first matching one as requested:

await WhenCompleted(tasks).FirstOrDefault(t => t.Status == TaskStatus.RanToCompletion)
Peebo
  • 39
  • 5
  • Where is defined this `AsyncEnumerable` class with the `CreateEnumerable`/`CreateEnumerator` methods? – Theodor Zoulias Jun 17 '20 at 13:51
  • 1
    In Nuget package System.Interactive.Async, edited my answer to mention that. Also realized I was using version 3.1.1, [updated that](https://stackoverflow.com/posts/62426036/timeline) to use version 4.1.1 – Peebo Jun 18 '20 at 16:36
0

Just wanted to add on some of the answers @Peebo and @SirRufo that are using List.Remove (because I can't comment yet)

I would consider using:

var tasks = source.ToHashSet();

instead of:

var tasks = source.ToList();

so removing would be more efficient

Rom Haviv
  • 128
  • 8
  • 2
    The `while`+`Task.WhenAny` approach is an antipattern: it is inefficient and doesn't scale well. Trying to optimize it by replacing the `List` with a `HashSet` is unlikely to have any noticeable effect, because too much else is going on (continuations are attached and detached to all remaining tasks on every loop, defensive arrays are allocated, copied and destroyed etc). The best way to optimize this pattern is to not use it. – Theodor Zoulias Feb 16 '21 at 10:37
  • 1
    @TheodorZoulias yup it indeed doesn't look like the most efficient answer here, yet it somehow became the most upvoted, maybe because of simplicity/readability so just wanted to add a little comment on that, TaskCompletionSource seems like the way to go, thanks for clarifying :) – Rom Haviv Feb 16 '21 at 11:42
  • 1
    Rom yeap, simplicity is certainly an asset, and if you are dealing with ~10 tasks the performance implications are negligible. But if the number of your tasks is ~10,000, Oh my! – Theodor Zoulias Feb 16 '21 at 12:07
  • 1
    found a cool article about the difference if anyone else wants to drill down : [link](https://devblogs.microsoft.com/pfxteam/processing-tasks-as-they-complete/) – Rom Haviv Feb 16 '21 at 12:09
  • 1
    Rom indeed, [that's](https://devblogs.microsoft.com/pfxteam/processing-tasks-as-they-complete/) a good reading, but nowadays there are better techniques available for such things (streaming async results in order of completion). Like the `IAsyncEnumerable`s and [the channels](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/). No need to preallocate that many `TaskCompletionSource>`s beforehand! – Theodor Zoulias Feb 16 '21 at 12:18