43

I have just started to look at the new "System.Threading.Tasks" goodness in .Net 4.0, and would like to know if there is any build in support for limiting the number of concurrent tasks that run at once, or if this should be manually handled.

E.G: If I need to call a calculation method 100 times, is there a way to set up 100 Tasks, but have only 5 execute simultaneously? The answer may just be to create 5 tasks, call Task.WaitAny, and create a new Task as each previous one finishes. I just want to make sure I am not missing a trick if there is a better way to do this.

Basically, is there a built in way to do this:

Dim taskArray() = {New Task(Function() DoComputation1()),
                   New Task(Function() DoComputation2()),
                   ...
                   New Task(Function() DoComputation100())}

Dim maxConcurrentThreads As Integer = 5
RunAllTasks(taskArray, maxConcurrentThreads)

Thanks for any help.

James
  • 7,877
  • 7
  • 42
  • 57
  • 1
    Could you elaborate on why you need to limit it at 5? Note that the task scheduler won't start all 100 at the same time, it uses the thread pool internally (or the thread pool uses the task system) so it will limit the number of concurrent tasks to something small, but it might change, and it could be related to the number of cores in your system, but knowing why you want to limit to a specific number might give some good answers. – Lasse V. Karlsen May 24 '10 at 18:49
  • 1
    The calculation actually calls a webservice as part of its operation. This is overwhelming the webservice. 5 is just an example. – James May 24 '10 at 20:24
  • 1
    How about Parallel? http://stackoverflow.com/questions/5009181/parallel-foreach-vs-task-factory-startnew – faester Oct 01 '13 at 12:08

8 Answers8

45

I know this is almost a year old, but I have found a much easier way to achieve this, so I thought I would share:

Dim actionsArray() As Action = 
     new Action(){
         New Action(Sub() DoComputation1()),
         New Action(Sub() DoComputation2()),
         ...
         New Action(Sub() DoComputation100())
      }

System.Threading.Tasks.Parallel.Invoke(New Tasks.ParallelOptions() With {.MaxDegreeOfParallelism = 5}, actionsArray)

Voila!

sth
  • 222,467
  • 53
  • 283
  • 367
James
  • 7,877
  • 7
  • 42
  • 57
28

I know this is an old thread, but I just wanted to share my solution to this problem: use semaphores.

(This is in C#)

private void RunAllActions(IEnumerable<Action> actions, int maxConcurrency)
{
    using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        foreach(Action action in actions)
        {
            Task.Factory.StartNew(() =>
            {
                concurrencySemaphore.Wait();
                try
                {
                    action();
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });
        }
    }
}
Arrow_Raider
  • 380
  • 3
  • 6
  • 1
    Thanks Arrow_Raider. This is a much better solution. I implemented this, but used a "continuation task" to handle the semaphore release. – James Aug 19 '10 at 03:53
  • 1
    I ma getting this error "{"The semaphore has been disposed."}" while executing the code. – Abdul Khaliq Dec 30 '10 at 15:02
  • I took @James idea to the next level. I called release in a continuation and I called dispose in a continuation on a parent task. – Rabbi Mar 10 '11 at 00:10
  • 6
    Won't this spawn lots of tasks? The concurrencySemaphore.Wait() is inside the new task lambda block. – Alan Christensen Sep 12 '11 at 22:03
  • 1
    @Abdul - you are getting that error because there's nothing stopping Dispose being called on the concurrencySemaphore once the last action has been started. You can solve this by blocking before Dispose while all the tasks are execution. Alternatively Rabbi's suggestion of having a parent task that calls Dispose on a continuation will do the trick. – Iain Nov 23 '12 at 17:13
  • I've used semaphores a few time because they seemed clean and elegant, but the performance reduction has been rather awful when compared to pretty much any other method of limiting concurrent tasks. – Dinerdo Feb 21 '19 at 15:22
7

A solution might be to take a look at the pre-made code from Microsoft here.

The description goes like this: "Provides a task scheduler that ensures a maximum concurrency level while running on top of the ThreadPool.", and as far as I've been able to test it seems to do the trick, in the same manner as the MaxDegreeOfParallelism property in ParallelOptions.

persistent
  • 190
  • 1
  • 9
6

C# equivalent of sample provided by James

Action[] actionsArray = new Action[] {
new Action(() => DoComputation1()),
new Action(() => DoComputation2()),
    //...
new Action(() => DoComputation100())
  };

   System.Threading.Tasks.Parallel.Invoke(new Tasks.ParallelOptions {MaxDegreeOfParallelism =  5 }, actionsArray)
Nigrimmist
  • 10,289
  • 4
  • 52
  • 53
4

My blog post shows how to do this both with Tasks and with Actions, and provides a sample project you can download and run to see both in action.

With Actions

If using Actions, you can use the built-in .Net Parallel.Invoke function. Here we limit it to running at most 5 threads in parallel.

var listOfActions = new List<Action>();
for (int i = 0; i < 100; i++)
{
    // Note that we create the Action here, but do not start it.
    listOfActions.Add(() => DoSomething());
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 5};
Parallel.Invoke(options, listOfActions.ToArray());

With Tasks

Since you are using Tasks here though, there is no built-in function. However, you can use the one that I provide on my blog.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don&#39;t enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                throttler.Wait(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler&#39;s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            Task.WaitAll(postTaskTasks.ToArray(), cancellationToken);
        }
    }

And then creating your list of Tasks and calling the function to have them run, with say a maximum of 5 simultaneous at a time, you could do this:

var listOfTasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
    var count = i;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => Something()));
}
Tasks.StartAndWaitAllThrottled(listOfTasks, 5);
deadlydog
  • 22,611
  • 14
  • 112
  • 118
  • Great! Just one question: in your case there is no task result. Suppose that every task return an object, and you want to return a list of object from your `StartAndWaitAllThrottled` method. How would you modify the current code? – Lorenzo Aug 24 '17 at 23:31
3

Short answer: If what you want is to limit the number of worker tasks so that they don't saturate your web service, then I think your approach is fine.

Long Answer: The new System.Threading.Tasks engine in .NET 4.0 runs on top of the .NET ThreadPool. Since there is only ever one ThreadPool per process and defaults to a maximum of 250 worker threads. Therefore, if you were to set the ThreadPool's maximum number of threads to a more modest number, you may be able to reduce the number of concurrently executing threads, and thus tasks using the ThreadPool.SetMaxThreads (...) API.

HOWEVER, note that you may well not be alone in utilizing the ThreadPool as many other classes that you utilize may also queue items to the ThreadPool. Therefore, there's a good chance that you may end up crippling the rest of your app by doing this. Also note that because the ThreadPool employs an algorithm to optimize its use of a given machine's underlying cores, limiting the number of threads the threadpool can queue to an arbitrarily low number can result in some pretty catastrophic performance issues.

Again, if you want to execute a small number of worker tasks/threads to exercise some task, then only creating a small number of tasks (vs. 100's) is the best approach.

Rich Turner
  • 10,800
  • 1
  • 51
  • 68
0

It does not look like it, although you could create a subclass of TaskScheduler that implements such behavior.

Matt
  • 74,352
  • 26
  • 153
  • 180
Justin Ethier
  • 131,333
  • 52
  • 229
  • 284
-3

If your program uses webservices number of simultaneous connections will be limited to ServicePointManager.DefaultConnectionLimit property. If you want 5 simultaneous connections it is not enough to use Arrow_Raider's solution. You also should increase ServicePointManager.DefaultConnectionLimit because it is only 2 by default.

Sergey Glotov
  • 20,200
  • 11
  • 84
  • 98
  • 2
    This question doesn't really have anything to do with HTTP requests but is more general. I think that this answer would have been better suited toward a question that is specific to HTTP requests. – Michael J. Gray Oct 28 '12 at 17:35