1

I have an array of tasks running identical jobs, but using different parameters on different servers. It could occur that one of the servers is unresponsive/slow resulting in a situation where all tasks have completed but one. At this moment I'm awaiting them using the Task.WhenAll(), so there is no option but to wait until my timeout expires.

In the ideal case all tasks complete within the timeout and I can gather all the results, but in the other case, basically I want to wait:

  • until n Tasks have completed
  • for another x minutes if n tasks have completed

If by the time that n tasks have been completed and we have waited for another x minutes, not all tasks have completed, I want to retrieve the result of the finished tasks.

Is there any way I can achieve the above?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Keaneum
  • 33
  • 3
  • 3
    You can build that using `Task.WhenAny` – canton7 Jan 04 '22 at 15:36
  • 1
    https://learn.microsoft.com/es-es/dotnet/csharp/programming-guide/concepts/async/cancel-async-tasks-after-a-period-of-time – Carlos Teixeira Jan 04 '22 at 15:40
  • [Process asynchronous tasks as they complete (C#)](https://learn.microsoft.com/en-us/dotnet/csharp/programming-guide/concepts/async/start-multiple-async-tasks-and-process-them-as-they-complete) – Gabriel Luci Jan 04 '22 at 15:41
  • Somewhat related: [Waiting asynchronously for some tasks to complete](https://stackoverflow.com/questions/56076258/waiting-asynchronously-for-some-tasks-to-complete-task-whensome) – Theodor Zoulias Jan 04 '22 at 16:37

3 Answers3

2

Even when you have complex logic for cancellation, you want to cancel the underlying tasks. If the underlying tasks are cancelled at the right time, you can use Task.WhenAll in any case.

So breaking down your question, what you're asking is, 'How can I cancel tasks based on the state of other tasks?'. You need to keep a state of number of completed tasks and cancel your tasks based on that state.

If you need to do 'stuff' when tasks complete (like update the state of how many tasks completed), I find continuations to be helpful and quite a clean solution. Example of your usecase:

// n from your question
var n = 4; 

// number of tasks currently completed
var tasksCompleted = 0; 

// The list of tasks (note it's the continuations in this case)
// You can also keep the continuations and actual tasks in separate lists.
var tasks = new List<Task>();

// delay before cancellation after n tasks completed
var timeAfterNCompleted = TimeSpan.FromMinutes(x); 
using var cts = new CancellationTokenSource();

for (int i = 0; i < 10; i++)
{
    // Do your work with a passed cancellationtoken you control
    var currentTask = DoWorkAsync(i, cts.Token);

    // Continuation will update the state of completed tasks
    currentTask = currentTask.ContinueWith((t) => 
    {
        if (t.IsCompletedSuccessfully)
        {
            var number = Interlocked.Increment(ref tasksCompleted);
            if (number == n)
            {
                // If we passed n tasks completed successfully,
                // We'll cancel after the grace period
                // Note that this will actually cancel the underlying tasks
                // Because we passed the token to the DoWorkAsync method
                cts.CancelAfter(timeAfterNCompleted);
            }
        }
    });
    tasks.Add(currentTask);
}

await Task.WhenAll(tasks);

// All your tasks have either completed or cancelled here
// Note that in this specific example all tasks will appear
// to have run to completion. That's because we're looking at
// the continuations here. Store continuation and actual task
// in separate lists and you can retrieve the results.
// (Make sure you await the continuations though)
Jesse de Wit
  • 3,867
  • 1
  • 20
  • 41
  • 2
    As a side note, [it's recommended](https://blog.stephencleary.com/2013/10/continuewith-is-dangerous-too.html) to pass the `TaskScheduler.Default` as argument to the `ContinueWith`, in order to be sure that the continuation will run on a well known scheduler. – Theodor Zoulias Jan 04 '22 at 22:13
0

Use Task.WhenAny to know if any tasks completes, then remove that completed task from your array.

stopWatch.Start();
while (arrayoftasks.Any())
{
    Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
    arrayOfTasks.Remove(finishedTask);
    await finishedTask;
    finishedCount++;
    if (finishedCount == 4) //check you stopwatch elapsed here.
    {
        Console.WriteLine("4 tasks have finished");
    }
}

Working example:

using System.Diagnostics;
using System.Security.Cryptography;

await Test.Go();
Console.ReadLine();
public static class Test
{
    public static async Task Go()
    {
        List<Task<string>> arrayOfTasks = GetArrayOfTasks();
        int finishedCount = 0;
        Stopwatch stopWatch = new Stopwatch();
        stopWatch.Start();
        while (arrayOfTasks.Any())
        {
            Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
            arrayOfTasks.Remove(finishedTask);
            Console.WriteLine(await finishedTask);
            finishedCount++;
            if (finishedCount == 4) //check you stopwatch elapsed here too
            {
                Console.WriteLine($":::{finishedCount} tasks have finished, {arrayOfTasks.Count} to go");
            }
        }
    }

    private static List<Task<string>> GetArrayOfTasks()
    {
        List<Task<string>> taskList = new();
        for (int i = 0; i < 10; i++)
        {
            var t = GetString(i);
            taskList.Add(t);
        }
        return taskList;
    }

    private static async Task<string> GetString(int i)
    {
        await Task.Delay(RandomNumberGenerator.GetInt32(1, 5000));
        return i.ToString();
    }
}   
thewallrus
  • 645
  • 4
  • 8
  • You don't need `await finishedTask;` -- that `Task` is already complete – canton7 Jan 04 '22 at 16:47
  • 3
    This scales *exceedingly* poorly with the number of tasks, as you're adding n^2 continuations to the tasks, rather than just one per task, and the TPL isn't optimized for such large numbers of continuations. It also adds lots of work that's potentially running after this operation's task has completed, making debugging the problem that much harder. Removing arbitrary items from a list is also unnecessarily inefficient, but that's less impactful than the continuations which have a much higher per unit cost. With non-trivial sizes these are realistic problems. – Servy Jan 04 '22 at 16:47
  • 1
    @canton7 Depends on the desired error handling semantics for whether it's desirable or not. – Servy Jan 04 '22 at 16:48
  • @Servy Continuations are removed once any single task completes, [see here](https://source.dot.net/#System.Private.CoreLib/TaskFactory.cs,2311). So you won't be accumulating continuations between calls to `Task.WhenAny` - you're just adding and removing continuations. A bit wasteful, but perhaps insignificant. There's an [MSDN article](https://learn.microsoft.com/en-us/dotnet/csharp/programming-guide/concepts/async/start-multiple-async-tasks-and-process-them-as-they-complete?pivots=dotnet-6-0) suggesting this approach. – canton7 Jan 04 '22 at 16:58
  • 2
    @canton7 Due to the race conditions it won't always succeed, but even then, still really inefficient. Lots of locking going on, and the continuations are stored in a list which isn't efficient to search as it scales, and the size of the list doesn't shrink when continuations are removed. That MSDN suggests it doesn't mean it's a good idea, they suggest plenty of suboptimal and sometimes even very problematic things. If the OP knows that they'll always have *very* small numbers of tasks, then that's fine. But that's a big "if". – Servy Jan 04 '22 at 17:05
  • @Servy Continuations are only stored in a list if there's more than one of them: in simple cases (like OP's, hopefully) there's no list, no `null` entries, no searching, and no locking (just CAS). Even if there are multiple continuations, nulls are cleared out next time a continuation is added (if that would expand the list), [see here](https://source.dot.net/#System.Private.CoreLib/Task.cs,4474) -- so it won't grow beyond 4 or 8, etc. And even *then*, I'm failing to spot the race which would leave you with a dangling continuation, but maybe that's just me being blind. – canton7 Jan 04 '22 at 17:26
  • Yes it's a bit expensive, but `async` is expensive. You'd have to profile it to see the real effects, but I'm pretty convinced it's not as terrible as you're making out. It should certainly scale linearly with the number of tasks being awaited – canton7 Jan 04 '22 at 17:27
  • @canton7 When whenany completes it will first complete the task returned by `WhenAny` (thus allowing continuations of that task to run, in this case that being the user code in this answer), then it goes and removes the continuations on other tasks. The continuation to when any may or may not end up getting to the point of adding new continuations before or after the when any internals remove the other continuations. And regardless of which wins, they'll both be manipulating the continuations at around the same time, meaning lock contention, which means context switching. – Servy Jan 04 '22 at 17:29
  • @canton7 It theoretically scales with the square of the number of tasks, not linearly (but it's easy to make it scale linearly , see the duplicate linked under the OP) but in practice, the fact that this code results in lots more lock contention and context switching, in practice it's likely to be much worse than n^2, since it results in lots of much more expensive operations that wouldn't be hit at all in a well implemented solution. – Servy Jan 04 '22 at 17:31
  • 1
    @Servy OK, if there's no SC you could race there granted, but I still disagree with your assertions about searching the list being inefficient as it scales (since it's cleared out). `Monitor.Enter` spins for a not-insignificant period before calling into the kernel, and `Task` doesn't hold the lock over the continuations for long, so I'd be surprised if you actually got threads being suspended here, but again I'd need to profile to be more confident here. We agree that there are better ways to implement this, my objection is with the word "*exceedingly*" – canton7 Jan 04 '22 at 17:38
  • @canton7 The code in *this answer* is adding the tasks and searching that list. That you have a monitor sitting there spinning while another thread is working is exactly the kind of overhead that you don't have by just adding the one continuation and using that. There's potentially lots of spinning every time any task finishes, and if there are more than just a handful, *that's exactly the kind of thing we want to be avoiding*. And given that this code is doing sub-optimal things in the continuations (linear searches through lists) that could easily make it take long enough to suspend too. – Servy Jan 04 '22 at 17:41
  • @Servy I don't think this is going to progress. We agree (and never disagreed) that there are betters ways to implement this, but we disagree on exactly how bad it is. I don't think there's any further useful information I can add: as with anything like this, it's down to a development time / runtime cost trade-off by the OP, informed by a profiler. – canton7 Jan 04 '22 at 17:43
  • 1
    @canton7 according to an [older experiment](https://stackoverflow.com/questions/58194212/how-to-implement-an-efficient-wheneach-that-streams-an-iasyncenumerable-of-task) of mine, using the `Task.WhenAny` in a loop results in a 10 sec overhead for 10,000 tasks. And doubling the tasks quadruples the overhead. – Theodor Zoulias Jan 04 '22 at 18:05
  • This solution also does not support `CancellationToken`s. – Aron Jan 04 '22 at 18:24
  • @TheodorZoulias So, about a millisecond overhead for 100 tasks? I doubt OP has 10,000 servers. – canton7 Jan 05 '22 at 08:45
  • 3
    @canton7 I wish I had ;) I have about 30 servers running each handling only one job. So in my case the overhead would be acceptable. – Keaneum Jan 05 '22 at 08:54
  • @canton7 no, far less than a millisecond. The overhead is not linear. With 100 tasks, the last task to complete will have a continuation attached and detached 100 times. With 10,000 tasks, the same last task will endure 10,000 attach/detach operations. It's O(n²) complexity in all its glory. In practice you may never have to deal with 10,000 concurrent tasks, but the engineer inside you should be screaming when realizing the inefficiency of this antipattern! – Theodor Zoulias Jan 05 '22 at 09:49
0

Rx.Net is the most elegant way to achieve this.

public IAsyncEnumerable<TResult> DoStuff<TResult>(IEnumerable<Func<CancellationToken, Task<TResult>>> tasks)
{
    var inputs = tasks
            // convert this into IObservable<TResult>
            // this type, like IAsyncEnumerable, contains
            // async logic, and cancellation...
            .ToObservable()
            .Select(task => Observable.FromAsync(task))
            .Merge()
            // publish/refcount is needed to ensure
            // we only run the tasks once, and share
            // the "result/event".
            .Publish()
            .RefCount();
                         // On the 100th Item
    var timeoutSignal = inputs.Skip(100 - 1)
                          .Take(1)
                          // Generate a signal 10 minutes after the 100th 
                          // item arrives
                          .Delay(TimeSpan.FromMinutes(10));
    return inputs
            // Take items until the timeout signal
            .TakeUntil(timeoutSignal)
            .ToAsyncEnumerable();
    
}

var items = await DoStuff(tasks).ToListAsync()
Aron
  • 15,464
  • 3
  • 31
  • 64