If you are tighten to .Net 2.0 you can use the following technique:
knowing the fact that whey you enqueue a task to the ThreadPool
it will create a new thread (of course if there're no free ones), you'll wait before doing this until there's a free thread. For this purpose the BlockingCounter
class is used (described below) that once the limit is reached will wait to increment until someone (another thread) decrements it. Then it's entered "closed" state indicating that no new Increments will be done and waits for completion.
Below is the sample that shows a maximum of 4 tasks with the total number of 10.
class Program
{
static int s_numCurrentThreads = 0;
static Random s_rnd = new Random();
static void Main(string[] args)
{
int maxParallelTasks = 4;
int totalTasks = 10;
using (BlockingCounter blockingCounter = new BlockingCounter(maxParallelTasks))
{
for (int i = 1; i <= totalTasks; i++)
{
Console.WriteLine("Submitting task {0}", i);
blockingCounter.WaitableIncrement();
if (!ThreadPool.QueueUserWorkItem((obj) =>
{
try
{
ThreadProc(obj);
}
catch (Exception ex)
{
Console.Error.WriteLine("Task {0} failed: {1}", obj, ex.Message);
}
finally
{
// Exceptions are possible here too,
// but proper error handling is not the goal of this sample
blockingCounter.WaitableDecrement();
}
}, i))
{
blockingCounter.WaitableDecrement();
Console.Error.WriteLine("Failed to submit task {0} for execution.", i);
}
}
Console.WriteLine("Waiting for copmletion...");
blockingCounter.CloseAndWait(30000);
}
Console.WriteLine("Work done!");
Console.ReadKey();
}
static void ThreadProc (object obj)
{
int taskNumber = (int) obj;
int numThreads = Interlocked.Increment(ref s_numCurrentThreads);
Console.WriteLine("Task {0} started. Total: {1}", taskNumber, numThreads);
int sleepTime = s_rnd.Next(0, 5);
Thread.Sleep(sleepTime * 1000);
Console.WriteLine("Task {0} finished.", taskNumber);
Interlocked.Decrement(ref s_numCurrentThreads);
}
It uses the BlockingCounter class that is based on the Marc Gravell's SizeQueue posted here, but without a counter instead of a queue. When you end queueing new threads call Close() method and then wait for it to finish.
public class BlockingCounter : IDisposable
{
private int m_Count;
private object m_counterLock = new object();
private bool m_isClosed = false;
private volatile bool m_isDisposed = false;
private int m_MaxSize = 0;
private ManualResetEvent m_Finished = new ManualResetEvent(false);
public BlockingCounter(int maxSize = 0)
{
if (maxSize < 0)
throw new ArgumentOutOfRangeException("maxSize");
m_MaxSize = maxSize;
}
public void WaitableIncrement(int timeoutMs = Timeout.Infinite)
{
lock (m_counterLock)
{
while (m_MaxSize > 0 && m_Count >= m_MaxSize)
{
CheckClosedOrDisposed();
if (!Monitor.Wait(m_counterLock, timeoutMs))
throw new TimeoutException("Failed to wait for counter to decrement.");
}
CheckClosedOrDisposed();
m_Count++;
if (m_Count == 1)
{
Monitor.PulseAll(m_counterLock);
}
}
}
public void WaitableDecrement(int timeoutMs = Timeout.Infinite)
{
lock (m_counterLock)
{
try
{
while (m_Count == 0)
{
CheckClosedOrDisposed();
if (!Monitor.Wait(m_counterLock, timeoutMs))
throw new TimeoutException("Failed to wait for counter to increment.");
}
CheckDisposed();
m_Count--;
if (m_MaxSize == 0 || m_Count == m_MaxSize - 1)
Monitor.PulseAll(m_counterLock);
}
finally
{
if (m_isClosed && m_Count == 0)
m_Finished.Set();
}
}
}
void CheckClosedOrDisposed()
{
if (m_isClosed)
throw new Exception("The counter is closed");
CheckDisposed();
}
void CheckDisposed()
{
if (m_isDisposed)
throw new ObjectDisposedException("The counter has been disposed.");
}
public void Close()
{
lock (m_counterLock)
{
CheckDisposed();
m_isClosed = true;
Monitor.PulseAll(m_counterLock);
}
}
public bool WaitForFinish(int timeoutMs = Timeout.Infinite)
{
CheckDisposed();
lock (m_counterLock)
{
if (m_Count == 0)
return true;
}
return m_Finished.WaitOne(timeoutMs);
}
public void CloseAndWait (int timeoutMs = Timeout.Infinite)
{
Close();
WaitForFinish(timeoutMs);
}
public void Dispose()
{
if (!m_isDisposed)
{
m_isDisposed = true;
lock (m_counterLock)
{
// Wake up all waiting threads, so that they know the object
// is disposed and there's nothing to wait anymore
Monitor.PulseAll(m_counterLock);
}
m_Finished.Close();
}
}
}
The result will be like that:
Submitting task 1
Submitting task 2
Submitting task 3
Submitting task 4
Submitting task 5
Task 1 started. Total: 1
Task 1 finished.
Task 3 started. Total: 1
Submitting task 6
Task 2 started. Total: 2
Task 3 finished.
Task 6 started. Total: 4
Task 5 started. Total: 3
Task 4 started. Total: 4
Submitting task 7
Task 4 finished.
Submitting task 8
Task 7 started. Total: 4
Task 5 finished.
Submitting task 9
Task 7 finished.
Task 8 started. Total: 4
Task 9 started. Total: 4
Submitting task 10
Task 2 finished.
Waiting for copmletion...
Task 10 started. Total: 4
Task 10 finished.
Task 6 finished.
Task 8 finished.
Task 9 finished.
Work done!