3

What is the best way to accomplish a queue line for threads so that I can only have a max number of threads and if I already have that many the code waits for a free slot before continuing..

Pseudo codeish example of what I mean, Im sure this can be done in a better way...

(Please check the additional requirements below)

private int _MaxThreads = 10;
private int _CurrentThreads = 0;

public void main(string[] args)
{
    List<object> listWithLotsOfItems = FillWithManyThings();

    while(listWithLotsOfItems.Count> 0)
    {
        // get next item that needs to be worked on
        var item = listWithLotsOfItems[0];
        listWithLotsOfItems.RemoveAt(0);

        // IMPORTANT!, more items can be added as we go.
        listWithLotsOfItems.AddRange(AddMoreItemsToBeProcessed());

        // wait for free thread slot
        while (_CurrentThreads >= _MaxThreads)
            Thread.Sleep(100);

        Interlocked.Increment(ref _CurrentThreads); // risk of letting more than one thread through here...
        Thread t = new Thread(new ParameterizedThreadStart(WorkerThread(item));
        t.Start();
    }
}

public void WorkerThread(object bigheavyObject)
{
    // do heavy work here
    Interlocked.Decrement(ref _CurrentThreads);
}

Looked at Sempahore but that seems to be needing to run inside the threads and not outside before it is created. In this example the Semaphore is used inside the thread after it is created to halt it, and in my case there could be over 100k threads that need to run before the job is done so I would rather not create the thread before a slot is available. (link to semaphore example)

In the real application, data can be added to the list of items as the program progresses so the Parallel.ForEach won't really work either (I'm doing this in a script component in a SSIS package to send data to a very slow WCF).

SSIS has .Net 4.0

Community
  • 1
  • 1
JensB
  • 6,663
  • 2
  • 55
  • 94
  • Have you tried using Semaphore and SemaphoreSlim ? https://msdn.microsoft.com/en-us/library/system.threading.semaphoreslim%28v=vs.110%29.aspx – Benji_9989 Feb 09 '15 at 15:40
  • @Benji_9989 Yes, please look at my added comment. – JensB Feb 09 '15 at 15:42
  • Prefer to use a construct like [`Parallel.ForEach`](https://msdn.microsoft.com/en-us/library/system.threading.tasks.parallel.foreach%28v=vs.110%29.aspx) with the `MaxDegreeOfParallelism` option rather than manually creating and managing threads, if at all possible. – Damien_The_Unbeliever Feb 09 '15 at 15:46
  • Hmm, In the real application of this my listWithLotsOfItems is actually getting new items added as I go along, not sure if Foreach would like that very much.. Thinking that Parallel.Invoke might be better if multiple calls to it will limit between themselves. Wish there was a `Parallel.While()` – JensB Feb 09 '15 at 15:53
  • What version of .NET do you have access to within your SSIS environment? – Paul Turner Feb 09 '15 at 16:21

3 Answers3

3

So, let me first of all say that what you're trying to do is only going to give you a small enhancement to performance in a very specific arrangement. It can be a lot of work to try and tune at the thread-allocation level, so be sure you have a very good reason before proceeding.

Now, first of all, if you want to simply queue up the work, you can put it on the .NET thread pool. It will only allocate threads up to the maximum configured and any work that doesn't fit onto those (if all the threads are busy) will be queued up until a thread becomes available.

The simplest way to do this is to call:

Task.Factory.StartNew(() => { /* Your code */});

This creates a TPL task and schedules it to run on the default task scheduler, which should in turn allocate the task to the thread-pool.

If you need to wait for these tasks to complete before proceeding, you can add them to a collection and then use Task.WaitAll(...):

var tasks = new List<Task>();

tasks.Add(Task.Factory.StartNew(() => { /* Your code */}));

// Before leaving the script.
Task.WaitAll(tasks);

However, if you need to go deeper and control the scheduling of these tasks, you can look at creating a custom task scheduler that supports limited concurrency. This MSDN article goes into more details about it and suggests a possible implementation, but it isn't a trivial task.

Paul Turner
  • 38,949
  • 15
  • 102
  • 166
2

The easiest way to do this is with the overload of Parallel.ForEach() which allows you to select MaxDegreeOfParallelism.

Here's a sample program:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            List<int> items = Enumerable.Range(1, 100).ToList();

            Parallel.ForEach(items, new ParallelOptions {MaxDegreeOfParallelism = 5}, process);
        }

        private static void process(int item)
        {
            Console.WriteLine("Processing " + item);
            Thread.Sleep(2000);
        }
    }
}

If you run this, you'll see that it processes 5 elements very quickly and then there's a delay (caused by Thread.Sleep(2000)) before the next block of elements is processed. This is because in this sample code no more than 5 threads are allowed to execute at once.

Note that if MaxDegreeOfParallelism exceeds the threadpool's minimum thread value, then it may take a while for all the threads to be started.

The reason for this is that Parallel.ForEach() uses threadpool threads - and there is a certain number of threads that the threadpool keeps available by default. When creating threads beyond this limit, a delay is introduced inbetween each new threadpool thread creation.

You can set the minimum number of threadpool threads to a higher value using ThreadPool.SetMinThreads(), but I do NOT recommend this.

However, if you do want to do so, here's an example which sets the minimum thread count to 20:

ThreadPool.GetMinThreads(out dummy, out ioThreads);
ThreadPool.SetMinThreads(20, ioThreads);

If you do that and then run the previous code with MaxDegreeOfParallelism = 20 you'll see that there's no longer any delay when the initial threads are created.

Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
  • How would this handle the `items` list getting more items during execution? I could wrap the whole Parallel.ForEach in a a while and in worst case scenario it would just loop over and start a new again... maybe thats the best way.. – JensB Feb 09 '15 at 16:12
  • Your last statement is incorrect. `Parallel.Foreach` will happily request more threads than processors. However there is still a "trick" about it, `Parallel.Foreach` will only use very few threads the first time it goes through the body, each loop it adds more threads till you hit the max. The last paragraph should say something along the lines of "Note that the MaxDegreeOfParallelism setting is a MAXIMUM value; `Parallel.Foreach` may use less threads than what you specified in the max value at the start during its 'warm up' phase". – Scott Chamberlain Feb 09 '15 at 16:13
  • I put a breakpoint inside the loop, look at the number of worker threads. [1st hit](http://i.stack.imgur.com/hknEI.png) Main thread + 2 worker threads (3 threads running), [2nd hit](http://i.stack.imgur.com/VjNg4.png) Main thread + 3 worker threads (4 threads running), [3rd hit](http://i.stack.imgur.com/Hurb5.png) Main thread + 4 worker threads (5 threads running). After that point it stays at 5 threads. – Scott Chamberlain Feb 09 '15 at 16:22
  • You still mention " if your processor has less cores that you request" in your answer, `Parallel.ForEach` does not care about the number of cores you have. Your statement will just confuse readers. – Scott Chamberlain Feb 09 '15 at 16:24
  • @ScottChamberlain I've added the missing information about the threadpool "minimum thread" value, which is what causes the delay with the threads starting. – Matthew Watson Feb 09 '15 at 16:30
0

Have you considered using a Wait handle? See this

Also you can use Parallel.Foreach to manage the thread creation for you.

Hope it helps ;)

Ninglin
  • 146
  • 1
  • 9