1

I need to keep maximum amount of running tasks fixed. I'm trying to handle this problem like this. Also I'm using ConcurrentQueue<T> and ConcurrentBag<T> because they are thread safe collections which is recommended.

But amount of running tasks is always growing. How can prevent this problem?

        public ConcurrentQueue<Task> Tasks { get; init; }
        public ConcurrentBag<Task> ListOfTasks { get; init; }
        public void Scanner()
        {

            while (condition)
            {
                Task task = new(() =>
                {
                    //do some stuff
                    TaskFinished();
                });
                Tasks.Enqueue(task);
                ListOfTasks.Add(task);
                var vrCounter = GetMissingTaskAmount();
                for (int i = 0; i < vrCounter; i++)
                {
                    if (Tasks.TryDequeue(out var vrTempTask) && vrTempTask != null)
                    {
                        vrTempTask.Start();
                    }
                }
            }
            Scanner();
        }
        private void TaskFinished()
        {
            if (GetMissingTaskAmount() == 0)
                return;
            if (Tasks.TryDequeue(out var vrTempTask) && vrTempTask != null)
            {
                vrTempTask.Start();
            }
        }
        private int GetMissingTaskAmount()
        {
            var vrCount = this.ListOfTasks.Count(p => p.Status == (TaskStatus)3);
            if (vrCount < this.AmountOfMaxTasks)
                return this.AmountOfMaxTasks - vrCount;
            return 0;
        }
Recep Gunes
  • 157
  • 1
  • 8
  • "I need to keep amount of running tasks fixed." - Why? Most people want the number of tasks to decrease. – Thomas Weller Jan 11 '22 at 10:07
  • @ThomasWeller Actually I want to say maximum amount of running threads. I'm sorry about it. I edited. – Recep Gunes Jan 11 '22 at 10:11
  • Possible duplicate of https://stackoverflow.com/questions/36564596/how-to-limit-the-maximum-number-of-parallel-tasks-in-c-sharp – Matthew Watson Jan 11 '22 at 10:32
  • Does this answer your question? [How to limit the Maximum number of parallel tasks in c#](https://stackoverflow.com/questions/36564596/how-to-limit-the-maximum-number-of-parallel-tasks-in-c-sharp) – Liam Jan 11 '22 at 10:55
  • @TheodorZoulias I use Queue because I want to control my tasks according to first in first out. Also, I use ConcurrentBag for get all task status from different classes. – Recep Gunes Jan 11 '22 at 12:08

1 Answers1

2

The simplest way to do this if you have all the data available via an IEnumerable<T> is using Parallel.ForEach().

For example:

// Encapsulate the data.
public sealed class WorkItem
{
    public WorkItem(int data)
    {
        Data = data;
    }

    public int Data { get; }
}

static class Program
{
    static void Main()
    {
        const int MAX_TASKS = 4;
        var options = new ParallelOptions() { MaxDegreeOfParallelism = MAX_TASKS };
        Parallel.ForEach(workItems(), options, process);
    }

    static void process(WorkItem workItem)
    {
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing item: {workItem.Data}");
        Thread.Sleep(250); // Simulate load.
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} has processed item: {workItem.Data}");
    }

    static IEnumerable<WorkItem> workItems()
    {
        const int TOTAL_WORK_ITEMS = 100;

        for (int i = 0; i < TOTAL_WORK_ITEMS; i++)
            yield return new WorkItem(i);
    }
}

However, if the data to be processed is being created on the fly (meaning you can't use Parallel.ForEach()), it's generally best handled using something like the Dataflow library.

There's a steep learning curve for that, but I think it's worth it. However, a somewhat simpler approach is to use BlockingCollection<T> to manage a queue that cannot expand indefinitely.

(NOTE: This approach is not good if the processing task can throw an exception, because it will stop processing the work items. In this case, you'd have to use something like a CancellationToken to cancel all the tasks in the event that one of them throws.)

Try running this sample console application for an example:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class WorkItem
    {
        public WorkItem(int data)
        {
            Data = data;
        }

        public int Data { get; }
    }

    static class Program
    {
        static void Main()
        {
            const int MAX_TASKS        =   4;
            const int MAX_WORK_ITEMS   =  10;
            const int TOTAL_WORK_ITEMS = 100;

            var workItems = new BlockingCollection<WorkItem>(MAX_WORK_ITEMS);

            // Create the tasks for processing the work items.

            var tasks = new Task[MAX_TASKS];

            for (int i = 0; i < MAX_TASKS; ++i)
                tasks[i] = Task.Run(() => Process(workItems));

            // Add the work items until there are no more.

            for (int i = 0; i < TOTAL_WORK_ITEMS; ++i)
            {
                var workItem = new WorkItem(i);
                Console.WriteLine($"Adding work item {workItem.Data}");
                workItems.Add(workItem);
                Console.WriteLine($"Added work item {workItem.Data}");
            }

            // Signal that there are no more items, so that the processing tasks exit.

            Console.WriteLine("Signalling end of source data.");
            workItems.CompleteAdding();

            Console.WriteLine("Waiting for tasks to complete");
            Task.WaitAll(tasks);
            Console.WriteLine("Finished waiting for all tasks to complete.");

            Console.ReadLine();
        }

        static void Process(BlockingCollection<WorkItem> workItems)
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is starting");

            foreach (var workItem in workItems.GetConsumingEnumerable())
            {
                Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing item: {workItem.Data}");
                Thread.Sleep(250); // Simulate load.
                Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} has processed item: {workItem.Data}");
            }

            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is exiting");
        }
    }
}

As noted by Theodor, if the processing can throw an exception then you must handle that explicitly. For this reason, it's better to use DataFlow in that case (although of course Parallel.ForEach() is far simpler if you can use it).

Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
  • 1
    A `Parrellel.For` seems like a much simpler solution to this problem? – Liam Jan 11 '22 at 10:59
  • The problem with the `Task.WaitAll(tasks);` approach, where the tasks are workers consuming a common `BlockingCollection`, is that it behaves poorly in case of failures. In case one worker fails, the rest of the workers will keep working, but the degree of parallelism will be reduced. If all workers except one fail, the last standing worker will slowly process all remaining items alone, until the exceptions are finally surfaced. – Theodor Zoulias Jan 11 '22 at 11:09
  • @Liam I'm adding an example for `Parallel.ForEach()` – Matthew Watson Jan 11 '22 at 11:09
  • @TheodorZoulias This is true, and `DataFlow` is better for this kind of thing. If using this approach you need to handle exceptions by setting a cancellation token or some other mechanism. – Matthew Watson Jan 11 '22 at 11:11
  • 1
    Yeap, the TPL Dataflow is essentially a refined version of this pattern. It's much more productive IMHO to spend a couple of hours reading the documentation and experimenting with an [`ActionBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1), than trying to reinvent its functionality from scratch. – Theodor Zoulias Jan 11 '22 at 11:15