14

I have some code of the following form:

static async Task DoSomething(int n) 
{
  ...
}

static void RunThreads(int totalThreads, int throttle) 
{
  var tasks = new List<Task>();
  for (var n = 0; n < totalThreads; n++)
  {
    var task = DoSomething(n);
    tasks.Add(task);
  }
  Task.WhenAll(tasks).Wait(); // all threads must complete
}

Trouble is, if I don't throttle the threads, things start falling apart. Now, I want to launch a maximum of throttle threads, and only start the new thread when an old one is complete. I've tried a few approaches and none so far has worked. Problems I have encountered include:

  • The tasks collection must be fully populated with all tasks, whether active or awaiting execution, otherwise the final .Wait() call only looks at the threads that it started with.
  • Chaining the execution seems to require use of Task.Run() or the like. But I need a reference to each task from the outset, and instantiating a task seems to kick it off automatically, which is what I don't want.

How to do this?

Sriram Sakthivel
  • 72,067
  • 7
  • 111
  • 189
Shaul Behr
  • 36,951
  • 69
  • 249
  • 387
  • Maybe add a amount of `throttle` tasks to the list and run them each time. – Clive DM Aug 17 '15 at 09:42
  • @CliveDM I don't have a clear picture of what you mean. Perhaps post an answer with some sample code? – Shaul Behr Aug 17 '15 at 09:43
  • http://blog.danskingdom.com/tag/c-task-thread-throttle-limit-maximum-simultaneous-concurrent-parallel/ – Niklas Peter Mar 16 '18 at 07:18
  • Related: [How to limit the amount of concurrent async I/O operations?](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations) – Theodor Zoulias Apr 01 '20 at 11:45

9 Answers9

28

First, abstract away from threads. Especially since your operation is asynchronous, you shouldn't be thinking about "threads" at all. In the asynchronous world, you have tasks, and you can have a huge number of tasks compared to threads.

Throttling asynchronous code can be done using SemaphoreSlim:

static async Task DoSomething(int n);

static void RunConcurrently(int total, int throttle) 
{
  var mutex = new SemaphoreSlim(throttle);
  var tasks = Enumerable.Range(0, total).Select(async item =>
  {
    await mutex.WaitAsync();
    try { await DoSomething(item); }
    finally { mutex.Release(); }
  });
  Task.WhenAll(tasks).Wait();
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
17

The simplest option IMO is to use TPL Dataflow. You just create an ActionBLock, limit it by the desired parallelism and start posting items into it. It makes sure to only run a certain amount of tasks at the same time, and when a task completes, it starts executing the next item:

async Task RunAsync(int totalThreads, int throttle) 
{
    var block = new ActionBlock<int>(
        DoSomething,
        new ExecutionDataFlowOptions { MaxDegreeOfParallelism = throttle });

    for (var n = 0; n < totalThreads; n++)
    {
        block.Post(n);
    }

    block.Complete();
    await block.Completion;
}
i3arnon
  • 113,022
  • 33
  • 324
  • 344
8

If I understand correctly, you can start tasks limited number of tasks mentioned by throttle parameter and wait for them to finish before starting next..

To wait for all started tasks to complete before starting new tasks, use the following implementation.

static async Task RunThreads(int totalThreads, int throttle)
{
    var tasks = new List<Task>();
    for (var n = 0; n < totalThreads; n++)
    {
        var task = DoSomething(n);
        tasks.Add(task);

        if (tasks.Count == throttle)
        {
            await Task.WhenAll(tasks);
            tasks.Clear();
        }
    }
    await Task.WhenAll(tasks); // wait for remaining
}

To add tasks as on when it is completed you can use the following code

static async Task RunThreads(int totalThreads, int throttle)
{
    var tasks = new List<Task>();
    for (var n = 0; n < totalThreads; n++)
    {
        var task = DoSomething(n);
        tasks.Add(task);

        if (tasks.Count == throttle)
        {
            var completed = await Task.WhenAny(tasks);
            tasks.Remove(completed);
        }
    }
    await Task.WhenAll(tasks); // all threads must complete
}
Sriram Sakthivel
  • 72,067
  • 7
  • 111
  • 189
7

Stephen Toub gives the following example for throttling in his The Task-based Asynchronous Pattern document.

const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
    nextIndex++;
}

while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch(Exception exc) { Log(exc); }

    if (nextIndex < urls.Length)
    {
        imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
        nextIndex++;
    }
}
Robert Hegner
  • 9,014
  • 7
  • 62
  • 98
7

Microsoft's Reactive Extensions (Rx) - NuGet "Rx-Main" - has this problem sorted very nicely.

Just do this:

static void RunThreads(int totalThreads, int throttle) 
{
    Observable
        .Range(0, totalThreads)
        .Select(n => Observable.FromAsync(() => DoSomething(n)))
        .Merge(throttle)
        .Wait();
}

Job done.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
6

.NET 6 introduces Parallel.ForEachAsync. You could rewrite your code like this:

static async ValueTask DoSomething(int n)
{
    ...
}

static Task RunThreads(int totalThreads, int throttle)
    => Parallel.ForEachAsync(Enumerable.Range(0, totalThreads), new ParallelOptions() { MaxDegreeOfParallelism = throttle }, (i, _) => DoSomething(i));

Notes:

  • I had to change the return type of your DoSomething function from Task to ValueTask.
  • You probably want to avoid the .Wait() call, so I made the RunThreads method async.
  • It is not obvious from your example why you need access to the individual tasks. This code does not give you access to the tasks, but might still be helpful in many cases.
Robert Hegner
  • 9,014
  • 7
  • 62
  • 98
2

Here are some extension method variations to build on Sriram Sakthivel answer.

In the usage example, calls to DoSomething are being wrapped in an explicitly cast closure to allow passing arguments.

public static async Task RunMyThrottledTasks()
{
    var myArgsSource = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
    await myArgsSource
        .Select(a => (Func<Task<object>>)(() => DoSomething(a)))
        .Throttle(2);
}

public static async Task<object> DoSomething(int arg)
{
    // Await some async calls that need arg..
    // ..then return result async Task..
    return new object();
}

public static async Task<IEnumerable<T>> Throttle<T>(IEnumerable<Func<Task<T>>> toRun, int throttleTo)
{
    var running = new List<Task<T>>(throttleTo);
    var completed = new List<Task<T>>(toRun.Count());
    foreach(var taskToRun in toRun)
    {
        running.Add(taskToRun());
        if(running.Count == throttleTo)
        {
            var comTask = await Task.WhenAny(running);
            running.Remove(comTask);
            completed.Add(comTask);
        }
    }
    return completed.Select(t => t.Result);
}

public static async Task Throttle(this IEnumerable<Func<Task>> toRun, int throttleTo)
{
    var running = new List<Task>(throttleTo);
    foreach(var taskToRun in toRun)
    {
        running.Add(taskToRun());
        if(running.Count == throttleTo)
        {
            var comTask = await Task.WhenAny(running);
            running.Remove(comTask);
        }
    }
}
Patrick Loyd
  • 681
  • 1
  • 5
  • 3
-1

What you need is a custom task scheduler. You can derive a class from System.Threading.Tasks.TaskScheduler and implement two major functions GetScheduledTasks(), QueueTask(), along with other functions to gain complete control over throttling tasks. Here is a well documented example.

https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0

amarnath chatterjee
  • 1,942
  • 16
  • 15
  • There is no need to implement a `TaskScheduler` by hand. You could just use a [`ConcurrentExclusiveSchedulerPair.ConcurrentScheduler`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.concurrentexclusiveschedulerpair.concurrentscheduler). But the bigger problem is that `TaskScheduler`s [are completely unable](https://stackoverflow.com/questions/15428604/how-to-run-a-task-on-a-custom-taskscheduler-using-await/57702536#57702536) to throttle asynchronous operations. – Theodor Zoulias Jun 12 '21 at 18:26
-1

You can actually emulate the Parallel.ForEachAsync method introduced as part of .NET 6. In order to emulate the same you can use the following code.

public static Task ForEachAsync<T>(IEnumerable<T> source, int maxDegreeOfParallelism, Func<T, Task> body) {
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(maxDegreeOfParallelism)
        select Task.Run(async delegate {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}
Grigory Zhadko
  • 1,484
  • 1
  • 19
  • 33
Nayanava
  • 69
  • 1
  • 6