35

Let's say I have 100 tasks that do something that takes 10 seconds. Now I want to only run 10 at a time like when 1 of those 10 finishes another task gets executed till all are finished.

Now I always used ThreadPool.QueueUserWorkItem() for such task but I've read that it is bad practice to do so and that I should use Tasks instead.

My problem is that I nowhere found a good example for my scenario so could you get me started on how to achieve this goal with Tasks?

maddo7
  • 4,503
  • 6
  • 31
  • 51
  • Where did you read that using the `ThreadPool` was bad practice? – Jonathon Reinhart Dec 28 '12 at 20:01
  • I would suggest reading some articles and or previous Stackoverflow postings there are plenty of coded examples that others have tried and where answers are provided http://stackoverflow.com/questions/6192898/thread-start-versus-threadpool-queueuserworkitem do a google search like I have C# Stackoverflow ThreadPool.QueueUserWorkItem() – MethodMan Dec 28 '12 at 20:03
  • Do you want a method that blocks until all of the tasks are done, or do you want a method that returns a `Task` when all of the tasks are done? – Servy Dec 28 '12 at 20:06
  • It should block, just like ThreadPool does but with tasks. Some guys here on Stackoverflow told me on a code sample that Threadpool would be bad practice – maddo7 Dec 28 '12 at 21:42

5 Answers5

39
SemaphoreSlim maxThread = new SemaphoreSlim(10);

for (int i = 0; i < 115; i++)
{
    maxThread.Wait();
    Task.Factory.StartNew(() =>
        {
            //Your Works
        }
        , TaskCreationOptions.LongRunning)
    .ContinueWith( (task) => maxThread.Release() );
}
L.B
  • 114,136
  • 19
  • 178
  • 224
  • Why do you specify `TaskCreationOptions.LongRunning`? – BornToCode Nov 09 '16 at 16:27
  • I would recommend using the Semaphore class instead of SemaphoreSlim if your task is intended to be a long running task. – Ariel Moraes Feb 03 '17 at 20:29
  • @MarcGravell I may be missing something but I only see a main thread running the loop and the workers(10) running at the same time.. The blocked one is the main thread(1) – L.B Aug 04 '17 at 22:14
  • @L.B. I apologise; I misread the location of the wait - my bad - I read in haste, and err'd – Marc Gravell Aug 04 '17 at 22:28
  • @L.B How to add a new task to queue when any one task of 10 task completed. Means add a new task to the queue. Here my objective is for example if we already set a limit of 10 task run in a single time by SemaphoreSlim or MaxDegreeOfParallelism but I don't want to create 100 task and then set limit by SemaphoreSlim or MaxDegreeOfParallelism and control them to run 10 on a single time. , I only want to create a new task when any one task completed from 10 task and this process will continue infinitely. – virender Apr 20 '18 at 06:49
  • @virender My answer does exactly that. Just replace *for* loop with an infinite loop – L.B Apr 25 '18 at 17:01
  • This will not wait for last `10` tasks in worst case and at least `1` task in best case. – Ashish Negi Jun 03 '18 at 13:50
19

TPL Dataflow is great for doing things like this. You can create a 100% async version of Parallel.Invoke pretty easily:

async Task ProcessTenAtOnce<T>(IEnumerable<T> items, Func<T, Task> func)
{
    ExecutionDataflowBlockOptions edfbo = new ExecutionDataflowBlockOptions
    {
         MaxDegreeOfParallelism = 10
    };

    ActionBlock<T> ab = new ActionBlock<T>(func, edfbo);

    foreach (T item in items)
    {
         await ab.SendAsync(item);
    }

    ab.Complete();
    await ab.Completion;
}
Cory Nelson
  • 29,236
  • 5
  • 72
  • 110
  • the TPL dataflow library is actually super cool and i've already found a use for it, thanks for pointing it out – Mike Corcoran Feb 20 '17 at 14:39
  • that's a great approach, is it possible to make it return though? - I'm thinking in using it to call a service which takes a while to answer. – Gabriel Espinoza Jul 13 '17 at 20:50
  • @GabrielEspinoza this is only a tiny bit of what TPL Dataflow can do. You might be able to use a `TransformBlock` for what you want. – Cory Nelson Jul 13 '17 at 20:54
  • The nuget for TPL Dataflow has now been unlisted. It has been replaced with [System.Threading.Tasks.Dataflow](https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/) – Gyum Fox Jul 11 '18 at 11:47
  • Could you update this demo to show where a user should add their commands to execute on each item, and how to pass in parameters for the items? – FoxDeploy May 01 '19 at 18:00
9

You have several options. You can use Parallel.Invoke for starters:

public void DoWork(IEnumerable<Action> actions)
{
    Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = 10 }
        , actions.ToArray());
}

Here is an alternate option that will work much harder to have exactly 10 tasks running (although the number of threads in the thread pool processing those tasks may be different) and that returns a Task indicating when it finishes, rather than blocking until done.

public Task DoWork(IList<Action> actions)
{
    List<Task> tasks = new List<Task>();
    int numWorkers = 10;
    int batchSize = (int)Math.Ceiling(actions.Count / (double)numWorkers);
    foreach (var batch in actions.Batch(actions.Count / 10))
    {
        tasks.Add(Task.Factory.StartNew(() =>
        {
            foreach (var action in batch)
            {
                action();
            }
        }));
    }

    return Task.WhenAll(tasks);
}

If you don't have MoreLinq, for the Batch function, here's my simpler implementation:

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int batchSize)
{
    List<T> buffer = new List<T>(batchSize);

    foreach (T item in source)
    {
        buffer.Add(item);

        if (buffer.Count >= batchSize)
        {
            yield return buffer;
            buffer = new List<T>();
        }
    }
    if (buffer.Count >= 0)
    {
        yield return buffer;
    }
}
Servy
  • 202,030
  • 26
  • 332
  • 449
  • `Now I want to only run 10 at a time`, *MaxDegreeOfParallelism* is only an upper bound. – L.B Dec 28 '12 at 20:07
  • @L.B Well, technically, if the number of units of work isn't divisible by 10 then *exactly* ten isn't possible. – Servy Dec 28 '12 at 20:08
  • No even with 100 works, it may end up running fewer Tasks. It depends on many parameters such as # of CPUs – L.B Dec 28 '12 at 20:10
  • @L.B I'm aware of that. I was stating that even if you tried to be closer to exactly 10 you can't always be perfect, you can only be...closer. – Servy Dec 28 '12 at 20:12
  • @L.B Does the additional version I added satisfy you? – Servy Dec 28 '12 at 20:15
  • I would do it differently. So I posted it as an answer – L.B Dec 28 '12 at 20:21
6

You can create a method like this:

public static async Task RunLimitedNumberAtATime<T>(int numberOfTasksConcurrent, 
    IEnumerable<T> inputList, Func<T, Task> asyncFunc)
{
    Queue<T> inputQueue = new Queue<T>(inputList);
    List<Task> runningTasks = new List<Task>(numberOfTasksConcurrent);
    for (int i = 0; i < numberOfTasksConcurrent && inputQueue.Count > 0; i++)
        runningTasks.Add(asyncFunc(inputQueue.Dequeue()));

    while (inputQueue.Count > 0)
    {
        Task task = await Task.WhenAny(runningTasks);
        runningTasks.Remove(task);
        runningTasks.Add(asyncFunc(inputQueue.Dequeue()));
    }

    await Task.WhenAll(runningTasks);
}

And then you can call any async method n times with a limit like this:

Task task = RunLimitedNumberAtATime(10,
    Enumerable.Range(1, 100),
    async x =>
    {
        Console.WriteLine($"Starting task {x}");
        await Task.Delay(100);
        Console.WriteLine($"Finishing task {x}");
    });

Or if you want to run long running non async methods, you can do it that way:

Task task = RunLimitedNumberAtATime(10,
    Enumerable.Range(1, 100),
    x => Task.Factory.StartNew(() => {
        Console.WriteLine($"Starting task {x}");
        System.Threading.Thread.Sleep(100);
        Console.WriteLine($"Finishing task {x}");
    }, TaskCreationOptions.LongRunning));

Maybe there is a similar method somewhere in the framework, but I didn't find it yet.

5

I would love to use the simplest solution I can think of which as I think using the TPL:

string[] urls={};
Parallel.ForEach(urls, new ParallelOptions() { MaxDegreeOfParallelism = 2}, url =>
{
   //Download the content or do whatever you want with each URL
});
Samer Aburabie
  • 248
  • 2
  • 8