1

I have this code:

var Options = new ParallelOptions
{
  MaxDegreeOfParallelism = Environment.ProcessorCount * 10,
  CancellationToken = CTS.Token
};

while (!CTS.IsCancellationRequested)
{

  var TasksZ = new[]
  {
      "Task A",
      "Task B",
      "Task C"
  };

  await Parallel.ForEachAsync(TasksZ, Options, async (Comando, Token) =>
  {
     await MyFunction(Comando)
     await Task.Delay(1000, Token);
});

Now, Task A, B and C start together and the cycle finish when ALL tasks are completed. Let's suppose that Task A and B finish in 10 seconds, but Task C in 2 minutes. In this case, A nd B have to wait 2 minutes too to start again. How can i make this independent? I mean, every task for it's own thread AND considering that var TasksZ is load dynamically and can change during the execution, by adding or removing other tasks.

Also, for stop/pause each individual task, i need a separate TaskCompletionSource for everyone, but MyFunction is an Interface in common with the main app & every DLL, i need to declare every TCS separated in the DLL(s) or just one in the common Interface?

Edit:
My idea is (using this this code from Microsoft) to have an app that run separated DLL, using the same interface but everyone have his job to do and can't wait each other. They mainly have this sequence of work: read a file -> handle an online POST request -> save a file -> communicate with the main app, the returned JSON, via custom class -> repeat. There are no other code that i can show you for let you understand, because now 90% is same as the link above, the other 10% is just the POST request with a JSON return in a custom class and load/save file. For be 101% clear, suppose the example before, the situation should be this:

AM 12:00:00 = start all
AM 12:00:10 = task_A end // 10s
AM 12:00:10 = task_B end // 10s
AM 12:00:20 = task_A end // 10s
AM 12:00:20 = task_B end // 10s
AM 12:00:30 = task_A end // 10s
AM 12:00:30 = task_B end // 10s
...
AM 12:01:50 = task_A end // 10s
AM 12:01:50 = task_B end // 10s
AM 12:02:00 = task_C end // 2 minutes
AM 12:02:10 = task_A end // 10s
AM 12:02:10 = task_B end // 10s
...

(This because i don't need live data for task_3, so it can POST every 2 minutes or so, but for task_1 and task_2 i need to have it live)

About the cores, the important is that the PC will not freeze or have 100% CPU. The server where i run this is a Dual Core, so MaxDegreeOfParallelism = Environment.ProcessorCount * 10 was just for not stress too much the server.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Tyler
  • 416
  • 4
  • 11
  • You should consider using a producer/consumer queue pattern with background processing that will take any available tasks from the queue and process them independently of the rest. – David L Jan 13 '23 at 17:29
  • @DavidL please can you be more specific? I am using C# since few months, and i don't know all the potential that can offer, thanks. – Tyler Jan 13 '23 at 17:31
  • so you want `task A` to complete before you start next `task A` and similarly `B`, but you don't want the next `task A` to wait for previously running `task C` ? Just like how you added current output, can you add _expected_ output to the question? – YK1 Jan 13 '23 at 22:19
  • *"The server where I run this is a Dual Core, so `MaxDegreeOfParallelism = Environment.ProcessorCount * 10` was just for not stress too much the server."* -- This sentence confuses me. The `Environment.ProcessorCount * 10` on a dual core evaluates to `20`, and your commandos are 3, so the `MaxDegreeOfParallelism` being larger than 3 will have no throttling effect. – Theodor Zoulias Jan 13 '23 at 22:55

4 Answers4

2

As I mentioned in my comment above, you can create your own wrapper around a queue that manages background processors of your queue and re-queues the tasks as they complete.

In addition, you mentioned the need to dynamically add or remove tasks at will, which the below implementation will handle.

And finally, it takes an external CancellationToken so that you can either call stop on the processor itself, or cancel the parent CancellationTokenSource.

public class QueueProcessor
{
    // could be replaced with a ref-count solution to ensure 
    // all duplicated tasks are removed
    private readonly HashSet<string> _tasksToRemove = new();
    private readonly ConcurrentQueue<string> _taskQueue;
    private Task[] _processors;
    private Func<string, CancellationToken, Task> _processorCallback;
    private CancellationTokenSource _cts;
    
    public QueueProcessor(
        string[] tasks, 
        Func<string, CancellationToken, Task> processorCallback)
    {   
        _taskQueue = new(tasks);
        _processorCallback = processorCallback;
    }

    public async Task StartAsync(int numberOfProcessorThreads,
        CancellationToken cancellationToken = default)
    {
        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        _processors = new Task[numberOfProcessorThreads];
        
        for (int i = 0; i < _processors.Length; i++)
        {
            _processors[i] = Task.Run(async () => await ProcessQueueAsync());
        }

        await Task.WhenAll(_processors);
    }
    
    public void Stop()
    {
        _cts.Cancel();
        _cts.Dispose();
    }
    
    public void RemoveTask(string task)
    {
        lock (_tasksToRemove)
        {
            _tasksToRemove.Add(task);
        }
    }
    
    public void AddTask(string task) => _taskQueue.Enqueue(task);
    
    private async Task ProcessQueueAsync()
    {
        while (!_cts.IsCancellationRequested)
        {
            if (_taskQueue.TryDequeue(out var task))
            {
                if (ShouldTaskBeRemoved(task))
                {
                    continue;
                }
                
                await _processorCallback(task, _cts.Token);
                
                if (!ShouldTaskBeRemoved(task))
                {
                    _taskQueue.Enqueue(task);
                }
            }
            else
            {
                // sleep for a bit before checking for more work
                await Task.Delay(1000, _cts.Token); 
            }
        }
    }
    
    private bool ShouldTaskBeRemoved(string task)
    {
        lock (_tasksToRemove)
        {
            if (_tasksToRemove.Contains(task))
            {
                Console.WriteLine($"Task {task} requested for removal");
                _tasksToRemove.Remove(task);
                return true;
            }
        }
        
        return false;
    }
}

You can test the above with the following:

public async Task MyFunction(string command, CancellationToken cancellationToken)
{
    await Task.Delay(50);
    
    if (!cancellationToken.IsCancellationRequested)
    {
        
        Console.WriteLine($"Execute command: {command}");
    }
    else
    {
        Console.WriteLine($"Terminating command: {command}");
    }
}


var cts = new CancellationTokenSource();
var processor = new QueueProcessor(
    new string[] { "Task1", "Task2", "Task3" }, 
    MyFunction);

var task = processor.StartAsync(2, cts.Token);

await Task.Delay(100);

processor.RemoveTask("Task1");

await Task.Delay(500);

cts.Cancel();

await runningProcessorTask;

This results in the following output:

Execute command: Task2
Execute command: Task1
Execute command: Task3
Execute command: Task2
Task Task1 requested for removal
Execute command: Task3
Execute command: Task2
Execute command: Task2
Execute command: Task3
Execute command: Task3
Execute command: Task2
Execute command: Task2
Execute command: Task3
Execute command: Task3
Execute command: Task2
Execute command: Task2
Execute command: Task3
Execute command: Task2
Execute command: Task3
Terminating command: Task2
Terminating command: Task3

If you would prefer to use a Channel<T> backed version that handles waiting for additional work gracefully without a manual Task.Delay, the following version exposes the same public api without the internal ConcurrentQueue<T>.

public class QueueProcessor
{
    // could be replaced with a ref-count solution to ensure all duplicated tasks are removed
    private readonly HashSet<string> _tasksToRemove = new();
    private readonly System.Threading.Channels.Channel<string> _taskQueue;
    private Task[] _processors;
    private Func<string, CancellationToken, Task> _processorCallback;
    private CancellationTokenSource _cts;
    
    public QueueProcessor(string[] tasks, Func<string, CancellationToken, Task> processorCallback)
    {
        _taskQueue = Channel.CreateUnbounded<string>();
        _processorCallback = processorCallback;
        
        for (int i = 0; i < tasks.Length; i++)
        {
            _taskQueue.Writer.WriteAsync(tasks[i]);
        }
    }

    public async Task StartAsync(int numberOfProcessorThreads, CancellationToken cancellationToken = default)
    {
        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        _processors = new Task[numberOfProcessorThreads];
        
        for (int i = 0; i < _processors.Length; i++)
        {
            _processors[i] = Task.Run(async () => await ProcessQueueAsync());
        }

        await Task.WhenAll(_processors);
    }

    public void Stop()
    {
        _taskQueue.Writer.TryComplete();
        _cts.Cancel();
        _cts.Dispose();
    }   
    
    public void RemoveTask(string task)
    {
        lock (_tasksToRemove)
        {
            _tasksToRemove.Add(task);
        }
    }
    
    public ValueTask AddTask(string task) => _taskQueue.Writer.WriteAsync(task);
    
    private async Task ProcessQueueAsync()
    {
        while (!_cts.IsCancellationRequested && await _taskQueue.Reader.WaitToReadAsync(_cts.Token))
        {
            if (_taskQueue.Reader.TryRead(out var task))
            {
                if (ShouldTaskBeRemoved(task))
                {
                    continue;
                }
                
                await _processorCallback(task, _cts.Token);
                
                if (!ShouldTaskBeRemoved(task))
                {
                    await _taskQueue.Writer.WriteAsync(task);
                }
            }
        }
    }
    
    private bool ShouldTaskBeRemoved(string task)
    {
        lock (_tasksToRemove)
        {
            if (_tasksToRemove.Contains(task))
            {
                Console.WriteLine($"Task {task} requested for removal");
                _tasksToRemove.Remove(task);
                return true;
            }
        }
        
        return false;
    }
}
David L
  • 32,885
  • 8
  • 62
  • 93
  • Thanks David. The class `QueueProcessor`, but the way i call the commands are a bit different. I have added more information to let you understand better. Currently i cant add the Interface call inside the `var processor = new QueueProcessor(...` – Tyler Jan 13 '23 at 21:35
  • @Tyler I don't understand how you are delineating responsibility based on your update. Perhaps you need different deployables for each kind of work being done, relying on external queues. – David L Jan 13 '23 at 21:41
  • 1
    Sorry, i missclick and i deleted part of the comment, you can see is missing something. Anyway, i was saying that the class works good, but i had some difficulty to read the returned class in the interface. But now i solved. Tested now with some simple commands, but now i will implement the real one to see if all work and i will let you know. (and also accept the answer here), just give me some hours, thanks! – Tyler Jan 13 '23 at 21:58
  • 1
    You could consider replacing the `ConcurrentQueue`+`Task.Delay` with a `Channel`, in order to avoid unproductive low-frequency iterations. – Theodor Zoulias Jan 14 '23 at 05:21
  • @TheodorZoulias I think that would be an excellent improvement, although I used a queue to more explicitly show the OP how this pattern might work, given their lack of familiarly with C#. – David L Jan 14 '23 at 15:21
  • David thanks again for the clear code, i have learnt a lot. Just some more things to make it perfect: if i call `cts.Cancel()` all process will end without start again, so i can use this command for a "Stop all" button. But, for a "Start all" button, if i call again `processor.StartAsync(n)` it won't start. So i tried to define globally `QueueProcessor processor;` but same thing, won't start again. Also, there is a way to avoid `.StartAsync(n)` where "n" is the number of tasks and make it "general"? Because my idea is to add plugins to the app without close it, so a static number isn't good. – Tyler Jan 14 '23 at 16:47
  • @TheodorZoulias can you explain it? I care about performance and clean/productive code. Thank you – Tyler Jan 14 '23 at 16:50
  • 1
    @Tyler I apologize, that was my fault. The CancellationToken should have been passed in on `StartAsync`, and if you cancel the CancellationTokenSource, you should create a new one every time you call StartAsync since it will be in a cancelled state. In addition, if you would prefer that I update the example to use `Channel`, I'd be happy to do so. – David L Jan 14 '23 at 19:32
  • 1
    @Tyler I added a second QueueProcessor implementation that uses `Channel` instead. It is functionally the same, but to TheodorZoulias' point, if you spend a lot of time waiting for work to be available (in instances where you have more background processors than tasks for example), `Channel` is more efficient. – David L Jan 14 '23 at 19:40
  • 1
    The pattern `while (!_cts.IsCancellationRequested && await _taskQueue.Reader...` creates a race condition, where the cancellation might either complete the task successfully or as canceled, depending on the winner of the race. A better pattern IMHO is `await foreach (string task in _taskQueue.Reader.ReadAllAsync(_cts.Token))`, that reacts consistently to a cancellation of the token. – Theodor Zoulias Jan 14 '23 at 21:29
  • @TheodorZoulias that's good feedback and I'll have to give that some thought. The issue with ReadAllAsync is that it just calls WaitToReadAsync internally, and then enumerates all items which isn't the intent of the this pattern, since you only want to receive a single instance. I can't reproduce a race condition since the the preliminary check doesn't negate the check of the token in WaitToReadAsync. The goal should be to prevent _any_ additional attempts to dequeue work if the `_cts.Token` is cancelled. – David L Jan 14 '23 at 22:42
  • The `ReadAllAsync` yields the items one at a time. Internally for performance reasons it calls `WaitToReadAsync` only when a `TryRead` has failed to read an item previously. This is an implementation detail. The consuming behavior is the same. – Theodor Zoulias Jan 14 '23 at 23:08
  • @TheodorZoulias I'm not seeing that here: https://github.com/dotnet/runtime/blob/67354a2fb4308650eb1d7db3277c862b8464c895/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.netcoreapp.cs#L20. In addition, you'd still need some sort of looping conditional per background task, preferably to check the token. I'm probably missing it, but I don't see how that is an improvement or any safer. – David L Jan 15 '23 at 00:39
  • We could discuss this a lot, but it's probably not that important. In general using the [`IsCancellationRequested`](https://stackoverflow.com/questions/15038529/use-of-iscancellationrequested-property) is a red flag for me. More often than not I see it misused. – Theodor Zoulias Jan 15 '23 at 13:14
  • Thanks for the addition David, i will test it right now – Tyler Jan 16 '23 at 18:00
2

I don't think that the Parallel.ForEachAsync is a suitable tool for solving your problem. My suggestion is to store the tasks in a dictionary that has the string commandos as keys, and (Task, CancellationTokenSource) tuples as values. Each time you add a commando in the dictionary, you start a Task associated with a CancellationTokenSource, after awaiting any previous Task that was stored previously for the same commando, in order to prevent concurrent executions of the same commando. For limiting the concurrency of all commandos, you can use a SemaphoreSlim. For limiting the parallelism (number of threads actively running code at any given moment) you can use a limited concurrency TaskScheduler. Here is a demo:

const int maximumConcurrency = 10;
const int maximumParallelism = 2;
Dictionary<string, (Task, CancellationTokenSource)> commandos = new();
SemaphoreSlim semaphore = new(maximumConcurrency, maximumConcurrency);
TaskScheduler scheduler = new ConcurrentExclusiveSchedulerPair(
    TaskScheduler.Default, maximumParallelism).ConcurrentScheduler;

StartCommando("Task A");
StartCommando("Task B");
StartCommando("Task C");

void StartCommando(string commando)
{
    Task existingTask = null;
    CancellationTokenSource existingCts = null;
    if (commandos.TryGetValue(commando, out var entry))
    {
        (existingTask, existingCts) = entry;
        existingCts.Cancel();
    }
    CancellationTokenSource cts = new();
    CancellationToken token = cts.Token;
    Task task = Task.Factory.StartNew(async () =>
    {
        if (existingTask is not null) try { await existingTask; } catch { }
        while (true)
        {
            await semaphore.WaitAsync(token);
            try
            {
                await MyFunction(commando, token);
            }
            finally { semaphore.Release(); }
        }
    }, token, TaskCreationOptions.DenyChildAttach, scheduler).Unwrap();
    commandos[commando] = (task, cts);
    existingCts?.Dispose();
}

void StopCommando(string commando)
{
    if (commandos.TryGetValue(commando, out var entry))
    {
        (_, CancellationTokenSource cts) = entry;
        cts.Cancel();
    }
}

Task DisposeAllCommandos()
{
    List<Task> tasks = new(commandos.Count);
    foreach (var (commando, entry) in commandos)
    {
        (Task task, CancellationTokenSource cts) = entry;
        cts.Cancel();
        commandos.Remove(commando);
        cts.Dispose();
        tasks.Add(task);
    }
    return Task.WhenAll(tasks);
}

Online demo.

It is important that all the awaits are not configured with ConfigureAwait(false). Enforcing the maximumParallelism policy depends on staying always in the realm of our preferred scheduler, so capturing the TaskScheduler.Current at the await points and continuing on that same scheduler is the desirable behavior. Which is also the default behavior of await.

The StartCommando, StopCommando and DisposeAllCommandos methods are intended to be called sequentially, not in parallel. In case you want to control the execution of the commandos from multiple threads in parallel, you'll have to synchronize these calls with a lock.

The DisposeAllCommandos is intended to be used before terminating the application. For a clean termination, the returned Task should be awaited. No more commandos should be started after calling this method.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Thanks for the short yet very useful code. I have tested it right now and is good under any circumstances. In this way i can start a new `StartCommando("dll interface name");` every time the app detect a new dll in the folder without restart it. The function `DisposeAllCommandos` (by the way, commandos remember me an old videogame) is a nice extra because make me feel less worried about leaving the server with the app running for months without think "what if my app is having now 9999 active threads? let me check". I have discovered also `SemaphoreSlim`, a good add to my path of learn. – Tyler Jan 14 '23 at 17:14
  • @Tyler yep, the "commandos" is a funny name, but it's better IMHO than "tasks", that creates confusion with the built-in `Task` type. – Theodor Zoulias Jan 14 '23 at 21:29
  • @Tyler btw in order to keep the example simple I cut some corners. Ideally the `StopCommando` would result in removing the relevant commando from the dictionary, after the completion of the `Task`. But then all access to the dictionary should be synchronized, because the `Task` might complete asynchronously on another thread. So in the example I just let the canceled task stay in the dictionary. – Theodor Zoulias Jan 14 '23 at 21:40
  • Well, in this case should be enough to add `commandos.Remove(commando);` inside the `if` of the void `StopCommando`, right? Another thing: `maximumParallelism` is the max of Tasks running in parallel at the same time, that's correct? What if i remove it from the `scheduler`? It is a good way for have undefined numbers of tasks in parallel or will cause some kind of overflow? Or maybe is better to define it with `maximumParallelism = MyListOfTasks.Count -1`? (in this case, `maximumConcurrency` should be increased too or 10 is enough for 3 or 30 tasks? Thank you – Tyler Jan 14 '23 at 22:04
  • 1
    @Tyler if you remove the `Task` from the dictionary while it is still running, then you'll have no way to prevent concurrency for the same commando. Whether this is a problem depends on your scenario. `maximumParallelism` is not the number of `Task`s. It's something more subtle. A `Task` might run on [no thread](https://blog.stephencleary.com/2013/11/there-is-no-thread.html) for large parts of its lifetime. The `maximumParallelism` ignores these timespans. It restricts the parallelism only for the timespans that a thread is actively running code. – Theodor Zoulias Jan 14 '23 at 22:14
  • 1
    @Tyler so if your machine is dual core, and you want to avoid saturating its CPU (consuming it 100%), the `maximumParallelism` should be `1`. You can run lots of asynchronous `Task`s concurrently on a single core. – Theodor Zoulias Jan 14 '23 at 22:29
  • Yes, that was my fault. Instead of using `await Task.Delay(TimeSpan.FromSeconds(5), token);`, in every task, i was using `Thread.Sleep(TimeSpan.FromSeconds(5));`. That's why with `maximumParallelism = 1` every Task was waiting for the others. Now is all ok, except for one little thing: In your online example you call `DisposeAllCommandos().GetAwaiter().GetResult();` but when i run this line, the GUI freeze until all tasks are completed, but calling only `DisposeAllCommandos()` without `.GetAwaiter().GetResult();` they dispose without freeze the GUI. Is that normal? – Tyler Jan 16 '23 at 19:08
  • @Tyler to avoid freezing the UI you can do `await DisposeAllCommandos();`. I don't recommend [firing-and-forgetting](https://stackoverflow.com/questions/61316504/proper-way-to-start-and-async-fire-and-forget-call/61320933#61320933) tasks. – Theodor Zoulias Jan 16 '23 at 19:21
  • No, wait. Calling ONLY `DisposeAllCommandos();` does NOT freeze the GUI. The freeze happen when i call `DisposeAllCommandos().GetAwaiter().GetResult();`. (maybe GetAwaiter() is the problem?) – Tyler Jan 16 '23 at 19:29
  • 1
    @Tyler the `.GetAwaiter().GetResult()` will definitely block the current thread. That's the purpose of using it: to block the current thread. Blocking the main thread of a console application is not a problem, but blocking the UI thread of a GUI application is certainly a problem. So don't use `.GetAwaiter().GetResult()`. Use `await` instead. `await` waits the completion of the task without blocking the current thread. – Theodor Zoulias Jan 17 '23 at 02:24
-1

Let me take a stab at identifying your actual root problem: you have I/O bound operations (network access, file I/O, database queries, etc) running at the same time as CPU bound operations (whatever processing you have on the former), and because of the way you wrote your code (that you don't show), you have I/O bound operations waiting for CPU bound ones to even start.

I'm guessing that because by reductio ad absurdum if everything was CPU bound then your CPU cores would be equally used no matter the order of operations, and for I/O bound operations the total time they'd take is equally independent of the order, they just have to get woken up when something finally finishes.

If I'm right, then the actual solution is to split your calls between two thread pools, one for CPU bound operations (that max at the number of available cores) and one for I/O bound operations (that max at some reasonable default, the maximum number of I/O connections that can be in flight at the same time). You can then schedule each operation to its own thread pool and await them as you normally would and they'd never step on each others' toes.

Blindy
  • 65,249
  • 10
  • 91
  • 131
  • Just in case it's not clear, the I/O bound thread pool will be almost completely unused, because the actual work is done by other processes or servers and your application would just sit and wait and eventually collect the results. This pattern is not overusing your CPU cores. – Blindy Jan 13 '23 at 20:39
  • You can use the Partitioner class to split the tasks into different partitions, and then use Parallel.ForEach with a different ParallelOptions object for each partition. Regarding the dynamic nature of TasksZ, you can use the Partitioner.Create method to create a partitioner based on the current state of TasksZ. Regarding the stopping/pausing of individual tasks, you can use a separate CancellationTokenSource for each task, and pass the corresponding CancellationToken to MyFunction. – asha Jan 13 '23 at 21:05
  • Blindy thanks for the reply, i have added some more information, so you will "have" the complete code and a better idea of what i'm looking for. @asha can you make an example of this Partitioner? Where all Tasks run in loop separately without wait each others. Thank you – Tyler Jan 13 '23 at 21:38
-2

You can use Parallel.Invoke() method to execute multiple processes at the same time.

var TasksZ = new[]
{
    () => MyFunction("Task A"),
    () => MyFunction("Task B"),
    () => MyFunction("Task C")
};


Parallel.Invoke(Options, TasksZ);


void MyFunction(string comando)
{
    Console.WriteLine(comando);
}
Bayram Eren
  • 422
  • 4
  • 6
  • Thanks for the answer, but i "don't know" how many and what commands i will have. In this case, `TasksZ` is static in the code, i can't add and remove "commands" from outside the app while is running – Tyler Jan 13 '23 at 18:15