3

In my C# console application, I'm trying to run multiple tasks that do various data checks simultaneously. If one of the tasks returns true I should stop the other tasks since I have my actionable result. It's also very possible none of the functions return true

I have the code to run the tasks together (I think), I'm just having trouble getting to the finish line:

Task task1 = Task.Run(() => Task1(stoppingToken));
Task task2 = Task.Run(() => Task2(stoppingToken));
Task task3 = Task.Run(() => Task3(stoppingToken));
Task task4 = Task.Run(() => Task4(stoppingToken));
Task task5 = Task.Run(() => Task5(stoppingToken));
Task task6 = Task.Run(() => Task6(stoppingToken));

Task.WaitAll(task1, task2, task3, task4, task5, task6);

This is a little different than the answer in the linked question where the desired result is known (timeout value). I'm waiting for any of these tasks to possibly return true and then cancel the remaining tasks if they are still running

Task.WhenAny with cancellation of the non completed tasks and timeout

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
chris
  • 1,152
  • 1
  • 15
  • 37
  • 2
    You can try calling Task.WhenAny in a loop and keep looping until WhenAny returns true. After that you can cancel like in the answer you linked. – Nick Muller Aug 11 '21 at 22:10
  • A loop wouldn't work for me. Perhaps I should rephrase the question since it makes it sound like a task HAS to be true. It's very likely that all the tasks return false and then I do nothing. If any one of the functions returns true that means I need to mark the data/set it aside – chris Aug 11 '21 at 22:12
  • Maybe a loop could still work if you checked if all tasks have finished in the loop – Nick Muller Aug 11 '21 at 22:14
  • 1
    Maybe via Task.WhenAll. Just don’t await it, but check if it has completed. – Nick Muller Aug 11 '21 at 22:16
  • What is the action supposed to be if all tasks complete and none of them return true? – David L Aug 11 '21 at 22:29
  • @DavidL any task true acts like a circuit breaker for other tasks doing similar data checks simultaneously. Each long-running task only needs to be run once tho and it's okay for it to return false. – chris Aug 11 '21 at 22:32

3 Answers3

2

Assuming your tasks return bool you can do something like this:

CancellationTokenSource source = new CancellationTokenSource();
CancellationToken stoppingToken = source.Token;
Task<bool> task1 = Task.Run(() => Task1(stoppingToken));
....

var tasks = new List<Task<bool>>
{
    task1, task2, task3, ...
};

bool taskResult = false;
do
{
    var finished = await Task.WhenAny(tasks);
    taskResult = finished.Result;
    tasks.Remove(finished);
} while (tasks.Any() && !taskResult);

source.Cancel();
Guru Stron
  • 102,774
  • 10
  • 95
  • 132
2

Here's a solution based on continuation tasks. The idea is to append continuation tasks to each of the original (provided) tasks, and check the result there. If it's a match, the completion source will be set with a result (if there's no match, the result won't be set at all).

Then, the code will wait for whatever happens first: either all the continuation tasks complete, or the task completion result will be set. Either way, we'll be ready to check the result of the task associated with task completion source (that's why we wait for the continuation tasks to complete, not the original tasks) and if it's set, it's pretty much an indication that we have a match (the additional check at the end is a little paranoid, but better safe than sorry I guess... :D)

public static async Task<bool> WhenAnyHasResult<T>(Predicate<T> isExpectedResult, params Task<T>[] tasks)
{
    const TaskContinuationOptions continuationTaskFlags = TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.AttachedToParent;
         
    // Prepare TaskCompletionSource to be set only when one of the provided tasks
    // completes with expected result
    var tcs = new TaskCompletionSource<T>();

    // For every provided task, attach a continuation task that fires
    // once the original task was completed
    var taskContinuations = tasks.Select(task =>
    {
        return task.ContinueWith(x =>
        {
            var taskResult = x.Result;
            if (isExpectedResult(taskResult))
            {
                tcs.SetResult(taskResult);
            }
        },
        continuationTaskFlags);
    });

    // We either wait for all the continuation tasks to be completed 
    // (it's most likely an indication that none of the provided tasks completed with the expected result)
    // or for the TCS task to complete (which means a failure)
    await Task.WhenAny(Task.WhenAll(taskContinuations), tcs.Task);

    // If the task from TCS has run to completion, it means the result has been set from
    // the continuation task attached to one of the tasks provided in the arguments
    var completionTask = tcs.Task;
    if (completionTask.IsCompleted)
    {
        // We will check once more to make sure the result is set as expected 
        // and return this as our outcome
        var tcsResult = completionTask.Result;
        return isExpectedResult(tcsResult);
    }

    // TCS result was never set, which means we did not find a task matching the expected result.
    tcs.SetCanceled();
    return false;
}

Now, the usage will be as follows:

static async Task ExampleWithBooleans()
{
    Console.WriteLine("Example with booleans");

    var task1 = SampleTask(3000, true);
    var task2 = SampleTask(5000, false);

    var finalResult = await TaskUtils.WhenAnyHasResult(result => result == true, task1, task2);

    // go ahead and cancel your cancellation token here

    Console.WriteLine("Final result: " + finalResult);
    Debug.Assert(finalResult == true);
    Console.WriteLine();
}

What's nice about putting it into a generic method, is that it works with any type, not only booleans, as a result of the original task.

vm_
  • 126
  • 2
  • 5
  • _"What if the task that has the expected result is not the one? Is it possible?"_ then the task with expected result will be returned in the next iteration. I am not sure what concerns about thread safety do you see in this particular case. – Guru Stron Aug 12 '21 at 07:16
  • This is just a bunch of questions I had that provoked me to think of a different solution. I will edit my response so that it doesn't sound like there is a problem with the other answer. Sorry if it came out this way! – vm_ Aug 12 '21 at 07:51
  • You should check out [this](https://blog.stephencleary.com/2013/10/continuewith-is-dangerous-too.html) article regarding the use of the `ContinueWith` method. In short this method by default runs the supplied lambda on the ambient `TaskScheduler.Current`, which can be anything (it can be the UI thread, or a limited concurrency `TaskScheduler` or whatever). So if you want to ensure that the lambda will be invoked consistently on the `ThreadPool`, you must pass the `TaskScheduler.Default` as argument on each and every `ContinueWith` call. – Theodor Zoulias Aug 14 '21 at 15:04
0

You could use an asynchronous method that wraps a Task<bool> to another Task<bool>, and cancels a CancellationTokenSource if the result of the input task is true. In the example below this method is the IfTrueCancel, and it is implemented as local function. This way it captures the CancellationTokenSource, and so you don't have to pass it as argument on every call:

var cts = new CancellationTokenSource();
var stoppingToken = cts.Token;

var task1 = IfTrueCancel(Task.Run(() => Task1(stoppingToken)));
var task2 = IfTrueCancel(Task.Run(() => Task2(stoppingToken)));
var task3 = IfTrueCancel(Task.Run(() => Task3(stoppingToken)));
var task4 = IfTrueCancel(Task.Run(() => Task4(stoppingToken)));
var task5 = IfTrueCancel(Task.Run(() => Task5(stoppingToken)));
var task6 = IfTrueCancel(Task.Run(() => Task6(stoppingToken)));

Task.WaitAll(task1, task2, task3, task4, task5, task6);

async Task<bool> IfTrueCancel(Task<bool> task)
{
    bool result = await task.ConfigureAwait(false);
    if (result) cts.Cancel();
    return result;
}

Another, quite different, solution to this problem could be to use PLINQ instead of explicitly created Tasks. PLINQ requires an IEnumerable of something in order to do parallel work on it, and in your case this something is the Task1, Task2 etc functions that you want to invoke. You could put them in an array of Func<CancellationToken, bool>, and solve the problem this way:

var functions = new Func<CancellationToken, bool>[]
{
    Task1, Task2, Task3, Task4, Task5, Task6
};

bool success = functions
    .AsParallel()
    .WithDegreeOfParallelism(4)
    .Select(function =>
    {
        try
        {
            bool result = function(stoppingToken);
            if (result) cts.Cancel();
            return result;
        }
        catch (OperationCanceledException)
        {
            return false;
        }
    })
    .Any(result => result);

The advantage of this approach is that you can configure the degree of parallelism, and you don't have to rely on the ThreadPool availability for limiting the concurrency of the whole operation. The disadvantage is that all functions should have the same signature. You could overcome this disadvantage by declaring the functions as lambda expressions like this:

var functions = new Func<CancellationToken, bool>[]
{
    ct => Task1(arg1, ct),
    ct => Task2(arg1, arg2, ct),
    ct => Task3(ct),
    ct => Task4(arg1, arg2, arg3, ct),
    ct => Task5(arg1, ct),
    ct => Task6(ct)
};
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104