1

I would like to ask expert developers in C#. I have three recurrent tasks that my program needs to do. Task 2 depends on task 1 and task 3 depends on task 2, but task 1 doesn't need to wait for the other two tasks to finish in order to start again (the program is continuously running). Since each task takes some time, I would like to run each task in one thread or a C# Task. Once task 1 finishes task 2 starts and task 1 starts again ... etc.

I'm not sure what is the best way to implement this. I hope someone can guide me on this.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Abdelsalam Hamdi
  • 465
  • 3
  • 13
  • You can checkout await async – Damien Sep 17 '21 at 06:36
  • There are so many good Q&As related to c# multi threading concept on SO, one which is in context to your Q has an A [here](https://stackoverflow.com/questions/23833255/does-using-tasks-tpl-library-make-an-application-multithreaded/23833635) – MDT Sep 17 '21 at 06:47

3 Answers3

0

One way to achieve this is using something called the the Task Parallel Library. This provides a set of classes that allow you to arrange your tasks into "blocks". You create a method that does A, B and C sequentially, then TPL will take care of running multiple invocations of that method simultaneously. Here's a small example:

async Task Main()
{
    var actionBlock = new ActionBlock<int>(DoTasksAsync, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 2 // This is the number of simultaneous executions of DoTasksAsync that will be run
    };
    
    await actionBlock.SendAsync(1);
    await actionBlock.SendAsync(2);
    
    actionBlock.Complete();
    await actionBlock.Completion;
}

async Task DoTasksAsync(int input)
{
    await DoTaskAAsync();
    await DoTaskBAsync();
    await DoTaskCAsync();
}
Ceilingfish
  • 5,397
  • 4
  • 44
  • 71
0

I would probably use some kind of queue pattern.

I am not sure what the requirements for if task 1 is threadsafe or not, so I will keep it simple:

  • Task 1 is always executing. As soon as it finished, it posts a message on some queue and starts over.
  • Task 2 is listening to the queue. Whenever a message is available, it starts working on it.
  • Whenever task 2 finishes working, it calls task 3, so that it can do it's work.

As one of the comments mentioned, you should probably be able to use async/await successfully in your code. Especially between task 2 and 3. Note that task 1 can be run in parallel to task 2 and 3, since it is not dependent on any of the other task.

Eric Ruder
  • 463
  • 5
  • 15
0

You could use the ParallelLoop method below. This method starts an asynchronous workflow, where the three tasks are invoked in parallel to each other, but sequentially to themselves. So you don't need to add synchronization inside each task, unless some task produces global side-effects that are visible from some other task.

The tasks are invoked on the ThreadPool, with the Task.Run method.

/// <summary>
/// Invokes three actions repeatedly in parallel on the ThreadPool, with the
/// action2 depending on the action1, and the action3 depending on the action2.
/// Each action is invoked sequentially to itself.
/// </summary>
public static async Task ParallelLoop<TResult1, TResult2>(
    Func<TResult1> action1,
    Func<TResult1, TResult2> action2,
    Action<TResult2> action3,
    CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    var task1 = Task.FromResult<TResult1>(default);
    var task2 = Task.FromResult<TResult2>(default);
    var task3 = Task.CompletedTask;
    try
    {
        int counter = 0;
        while (true)
        {
            counter++;

            var result1 = await task1.ConfigureAwait(false);
            cancellationToken.ThrowIfCancellationRequested();
            task1 = Task.Run(action1); // Restart the task1
            if (counter <= 1) continue; // In the first loop result1 is undefined

            var result2 = await task2.ConfigureAwait(false);
            cancellationToken.ThrowIfCancellationRequested();
            task2 = Task.Run(() => action2(result1)); // Restart the task2
            if (counter <= 2) continue; // In the second loop result2 is undefined

            await task3.ConfigureAwait(false);
            cancellationToken.ThrowIfCancellationRequested();
            task3 = Task.Run(() => action3(result2)); // Restart the task3
        }
    }
    finally
    {
        // Prevent fire-and-forget
        Task allTasks = Task.WhenAll(task1, task2, task3);
        try { await allTasks.ConfigureAwait(false); } catch { allTasks.Wait(); }
        // Propagate all errors in an AggregateException
    }
}

There is an obvious pattern in the implementation, that makes it trivial to add overloads having more than three actions. Each added action will require its own generic type parameter (TResult3, TResult4 etc).

Usage example:

var cts = new CancellationTokenSource();
Task loopTask = ParallelLoop(() =>
{
    // First task
    Thread.Sleep(1000); // Simulates synchronous work
    return "OK"; // The result that is passed to the second task
}, result =>
{
    // Second task
    Thread.Sleep(1000); // Simulates synchronous work
    return result + "!"; // The result that is passed to the third task
}, result =>
{
    // Third task
    Thread.Sleep(1000); // Simulates synchronous work
}, cts.Token);

In case any of the tasks fails, the whole loop will stop (with the loopTask.Exception containing the error). Since the tasks depend on each other, recovering from a single failed task is not possible¹. What you could do is to execute the whole loop through a Polly Retry policy, to make sure that the loop will be reincarnated in case of failure. If you are unfamiliar with the Polly library, you could use the simple and featureless RetryUntilCanceled method below:

public static async Task RetryUntilCanceled(Func<Task> action,
    CancellationToken cancellationToken)
{
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();
        try { await action().ConfigureAwait(false); }
        catch { if (cancellationToken.IsCancellationRequested) throw; }
    }
}

Usage:

Task loopTask = RetryUntilCanceled(() => ParallelLoop(() =>
{
   //...
}, cts.Token), cts.Token);

Before exiting the process you are advised to Cancel() the CancellationTokenSource and Wait() (or await) the loopTask, in order for the loop to terminate gracefully. Otherwise some tasks may be aborted in the middle of their work.

¹ It is actually possible, and probably preferable, to execute each individual task through a Polly Retry policy. The parallel loop will be suspended until the failed task is retried successfully.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Note: canceling the `cancellationToken` cancels the parallel loop without all actions having been executed an equal number of times. The `action1` is executed one more time than the `action2`, and the `action2` is executed one more time than the `action3`. – Theodor Zoulias Sep 21 '21 at 16:33
  • I've uploaded a polished implementation of the above idea on [this](https://github.com/theodorzoulias/ParallelLoopLibrary "Parallel Loop Library") GitHub repository. – Theodor Zoulias Oct 14 '21 at 08:44