40

I'm sorry for a redundant question. However, I've found many solutions to my problem but none of them are very well explained. I'm hoping that it will be made clear, here.

My C# application's main thread spawns 1..n background workers using the ThreadPool. I wish for the original thread to lock until all of the workers have completed. I have researched the ManualResetEvent in particular but I'm not clear on it's use.

In pseudo:

foreach( var o in collection )
{
  queue new worker(o);
}

while( workers not completed ) { continue; }

If necessary, I will know the number of workers that are about to be queued before hand.

Jesse Hallam
  • 6,794
  • 8
  • 48
  • 70
  • Hi take a look at similar post here http://stackoverflow.com/questions/358721/be-notified-when-all-background-threadpool-threads-are-finished – Valentin V Feb 12 '09 at 05:40

10 Answers10

56

Try this. The function takes in a list of Action delegates. It will add a ThreadPool worker entry for each item in the list. It will wait for every action to complete before returning.

public static void SpawnAndWait(IEnumerable<Action> actions)
{
    var list = actions.ToList();
    var handles = new ManualResetEvent[actions.Count()];
    for (var i = 0; i < list.Count; i++)
    {
        handles[i] = new ManualResetEvent(false);
        var currentAction = list[i];
        var currentHandle = handles[i];
        Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };
        ThreadPool.QueueUserWorkItem(x => wrappedAction());
    }

    WaitHandle.WaitAll(handles);
}
JaredPar
  • 733,204
  • 149
  • 1,241
  • 1,454
  • 7
    WaitHandle.WaitAll fails if the number of handles is larger than the system permits. On my Win2k3 server that number is 64 so I get an exception when I try to spawn more than 64 items... – Eran Kampf Mar 09 '09 at 15:43
  • 1
    @Eran, try writing a SpawAndWaitHelper which essentially has the code above. Use SpawAndWait to divide up the enumerable into 64 size chunks and call the helper for each chunk. – JaredPar Mar 09 '09 at 16:02
  • ah... http://stackoverflow.com/questions/1045980/is-there-a-better-way-to-wait-for-queued-threads/1074770#1074770 – bohdan_trotsenko Jul 02 '09 at 14:41
  • 2
    There is a serious error in this code. Since the wrapper action is evaluated lazily, it is possible that the first thread executing the wrapper action, getting the 2nd , 3rd etc handle instead of first. – Sriwantha Attanayake Sep 03 '11 at 22:07
  • 2
    I don't understand how the serious error will happen. As I understand, while assigning wrappedAction, the currentAction value has been caught, saved under this wrappedAction. It's evaluated lazily, but the value wrappedAction/currentAction won't be changed anyway. – ChrisTorng Dec 17 '15 at 03:55
31

Here's a different approach - encapsulation; so your code could be as simple as:

    Forker p = new Forker();
    foreach (var obj in collection)
    {
        var tmp = obj;
        p.Fork(delegate { DoSomeWork(tmp); });
    }
    p.Join();

Where the Forker class is given below (I got bored on the train ;-p)... again, this avoids OS objects, but wraps things up quite neatly (IMO):

using System;
using System.Threading;

/// <summary>Event arguments representing the completion of a parallel action.</summary>
public class ParallelEventArgs : EventArgs
{
    private readonly object state;
    private readonly Exception exception;
    internal ParallelEventArgs(object state, Exception exception)
    {
        this.state = state;
        this.exception = exception;
    }

    /// <summary>The opaque state object that identifies the action (null otherwise).</summary>
    public object State { get { return state; } }

    /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary>
    public Exception Exception { get { return exception; } }
}

/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary>
public sealed class Forker
{
    int running;
    private readonly object joinLock = new object(), eventLock = new object();

    /// <summary>Raised when all operations have completed.</summary>
    public event EventHandler AllComplete
    {
        add { lock (eventLock) { allComplete += value; } }
        remove { lock (eventLock) { allComplete -= value; } }
    }
    private EventHandler allComplete;
    /// <summary>Raised when each operation completes.</summary>
    public event EventHandler<ParallelEventArgs> ItemComplete
    {
        add { lock (eventLock) { itemComplete += value; } }
        remove { lock (eventLock) { itemComplete -= value; } }
    }
    private EventHandler<ParallelEventArgs> itemComplete;

    private void OnItemComplete(object state, Exception exception)
    {
        EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock
        if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));
        if (Interlocked.Decrement(ref running) == 0)
        {
            EventHandler allHandler = allComplete; // don't need to lock
            if (allHandler != null) allHandler(this, EventArgs.Empty);
            lock (joinLock)
            {
                Monitor.PulseAll(joinLock);
            }
        }
    }

    /// <summary>Adds a callback to invoke when each operation completes.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        ItemComplete += handler;
        return this;
    }

    /// <summary>Adds a callback to invoke when all operations are complete.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnAllComplete(EventHandler handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        AllComplete += handler;
        return this;
    }

    /// <summary>Waits for all operations to complete.</summary>
    public void Join()
    {
        Join(-1);
    }

    /// <summary>Waits (with timeout) for all operations to complete.</summary>
    /// <returns>Whether all operations had completed before the timeout.</returns>
    public bool Join(int millisecondsTimeout)
    {
        lock (joinLock)
        {
            if (CountRunning() == 0) return true;
            Thread.SpinWait(1); // try our luck...
            return (CountRunning() == 0) ||
                Monitor.Wait(joinLock, millisecondsTimeout);
        }
    }

    /// <summary>Indicates the number of incomplete operations.</summary>
    /// <returns>The number of incomplete operations.</returns>
    public int CountRunning()
    {
        return Interlocked.CompareExchange(ref running, 0, 0);
    }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action) { return Fork(action, null); }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <param name="state">An opaque object, allowing the caller to identify operations.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action, object state)
    {
        if (action == null) throw new ArgumentNullException("action");
        Interlocked.Increment(ref running);
        ThreadPool.QueueUserWorkItem(delegate
        {
            Exception exception = null;
            try { action(); }
            catch (Exception ex) { exception = ex;}
            OnItemComplete(state, exception);
        });
        return this;
    }
}
Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • (HI MARC! Remember this post??) Out of curiosity, why is var tmp = obj necessary? I implemented it by just passing my object in and I got crazy results. Changing it over to using var ended up working. I'm clearly not understanding something! Thanks, and see if you can remember after a mere two years :) – DanTheMan Mar 18 '11 at 21:52
  • 1
    @user The answer to that is a bit complicated, but in short, it's because C# fails to quietly do exactly what you meant without you even realising. It's generally quite good at doing this unambiguously at all the right places, but not in this case. – Roman Starkov Sep 09 '11 at 14:20
  • 5
    You need to understand that the code `delegate { DoSomeWork(tmp); }` *captures* the variable `tmp`. Each call to this code captures a *different* variable each time around the loop, because `tmp`'s scope is confined to the body of the loop. However, the `foreach` variable is the *same* variable each time around the loop, so all calls to `delegate { DoSomeWork(tmp); }` capture the same thing. This really doesn't need to be like this; limiting the scope of the foreach variable would have made lots of code "just work" without people even realising the trickiness of the situation. – Roman Starkov Sep 09 '11 at 14:33
  • Thanks a lot!! I used this for https://sourceforge.net/projects/icompress/ – Yogee Oct 17 '16 at 21:01
14

First, how long do the workers execute? pool threads should generally be used for short-lived tasks - if they are going to run for a while, consider manual threads.

Re the problem; do you actually need to block the main thread? Can you use a callback instead? If so, something like:

int running = 1; // start at 1 to prevent multiple callbacks if
          // tasks finish faster than they are started
Action endOfThread = delegate {
    if(Interlocked.Decrement(ref running) == 0) {
        // ****run callback method****
    }
};
foreach(var o in collection)
{
    var tmp = o; // avoid "capture" issue
    Interlocked.Increment(ref running);
    ThreadPool.QueueUserWorkItem(delegate {
        DoSomeWork(tmp); // [A] should handle exceptions internally
        endOfThread();
    });
}
endOfThread(); // opposite of "start at 1"

This is a fairly lightweight (no OS primitives) way of tracking the workers.

If you need to block, you can do the same using a Monitor (again, avoiding an OS object):

    object syncLock = new object();
    int running = 1;
    Action endOfThread = delegate {
        if (Interlocked.Decrement(ref running) == 0) {
            lock (syncLock) {
                Monitor.Pulse(syncLock);
            }
        }
    };
    lock (syncLock) {
        foreach (var o in collection) {
            var tmp = o; // avoid "capture" issue
            ThreadPool.QueueUserWorkItem(delegate
            {
                DoSomeWork(tmp); // [A] should handle exceptions internally
                endOfThread();
            });
        }
        endOfThread();
        Monitor.Wait(syncLock);
    }
    Console.WriteLine("all done");
JSBձոգչ
  • 40,684
  • 18
  • 101
  • 169
Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • 2
    Your code will wait infinitely if one of the delegates throws an exception. – JaredPar Feb 12 '09 at 05:20
  • 2
    If one of those delegates throws an exception, I'm going to lose the whole process, so that is fairly arbitrary... I'm assuming it won't throw, but I'll make it explicit ;-p – Marc Gravell Feb 12 '09 at 05:25
  • The workers will be processing expensive operations including reading and writing files and performing SQL selects and inserts involving Binary/Image columns. It's unlikely they'll live long enough to require explicit threads, but performance could be gained by letting them execute in parallel. – Jesse Hallam Feb 12 '09 at 05:27
  • +1, To handle exception in worker process, you could do try { DoSomeWork(tmp); } finally { endOfThread(); } – Chaowlert Chaisrichalermpol Feb 12 '09 at 05:41
  • @Marc, whether or not a ThreadPool exception kills the process is not a certainty. It changed between version 1.0 and 2.0 of the CLR ( I believe it's also configurable) Truthfully I can't remember which version does which anymore. I just assume the worst with threads :) – JaredPar Feb 12 '09 at 05:43
  • Found the documentation for the exception change: http://msdn.microsoft.com/en-us/library/ms228965.aspx – JaredPar Feb 12 '09 at 05:46
9

I have been using the new Parallel task library in CTP here:

       Parallel.ForEach(collection, o =>
            {
                DoSomeWork(o);
            });
  • Good suggestion! Also easier when it comes to handling exceptions. See: http://msdn.microsoft.com/en-us/library/dd991486.aspx – Joop Sep 23 '11 at 06:42
  • Take special caution as this uses the ThreadPool and it is not possible to force it to use dedicated (unmanaged) threads. Even using the underlying TaskFactory with LongRunning option only provides a hint at the scheduler, but is not a guarantee for a dedicated thread. – eduncan911 Aug 07 '12 at 16:34
3

Here is a solution using the CountdownEvent class.

var complete = new CountdownEvent(1);
foreach (var o in collection)
{
  var capture = o;
  ThreadPool.QueueUserWorkItem((state) =>
    {
      try
      {
        DoSomething(capture);
      }
      finally
      {
        complete.Signal();
      }
    }, null);
}
complete.Signal();
complete.Wait();

Of course, if you have access to the CountdownEvent class then you have the whole TPL to work with. The Parallel class takes care of the waiting for you.

Parallel.ForEach(collection, o =>
  {
    DoSomething(o);
  });
Brian Gideon
  • 47,849
  • 13
  • 107
  • 150
1

Using .NET 4.0 Barrier class:

        Barrier sync = new Barrier(1);

        foreach(var o in collection)
        {
            WaitCallback worker = (state) => 
            {
                // do work
                sync.SignalAndWait();
            };

            sync.AddParticipant();
            ThreadPool.QueueUserWorkItem(worker, o);
        }

        sync.SignalAndWait();
Joseph Kingry
  • 8,188
  • 4
  • 36
  • 52
1

I think you were on the right track with the ManualResetEvent. This link has a code sample that closely matches what your trying to do. The key is to use the WaitHandle.WaitAll and pass an array of wait events. Each thread needs to set one of these wait events.

   // Simultaneously calculate the terms.
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateBase));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateFirstTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateSecondTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateThirdTerm));

    // Wait for all of the terms to be calculated.
    WaitHandle.WaitAll(autoEvents);

    // Reset the wait handle for the next calculation.
    manualEvent.Reset();

Edit:

Make sure that in your worker thread code path you set the event (i.e. autoEvents1.Set();). Once they are all signaled the waitAll will return.

void CalculateSecondTerm(object stateInfo)
{
    double preCalc = randomGenerator.NextDouble();
    manualEvent.WaitOne();
    secondTerm = preCalc * baseNumber * 
        randomGenerator.NextDouble();
    autoEvents[1].Set();
}
James
  • 12,636
  • 12
  • 67
  • 104
1

Try using CountdownEvent

// code before the threads start

CountdownEvent countdown = new CountdownEvent(collection.Length);

foreach (var o in collection)
{
    ThreadPool.QueueUserWorkItem(delegate
    {
        // do something with the worker
        Console.WriteLine("Thread Done!");
        countdown.Signal();
    });
}
countdown.Wait();

Console.WriteLine("Job Done!");

// resume the code here

The countdown would wait until all threads have finished execution.

nikoo28
  • 2,961
  • 1
  • 29
  • 39
1

I've found a good solution here :

http://msdn.microsoft.com/en-us/magazine/cc163914.aspx

May come in handy for others with the same issue

Gordon Thompson
  • 4,764
  • 8
  • 48
  • 62
-2

Wait for completion of all threads in thread pool there is no inbuilt method available. Using count no. of threads are active, we can achieve it...

{
        bool working = true;
        ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxCompletionPortThreads);
        while (working)
        {
            ThreadPool.GetAvailableThreads(out int workerThreads, out int completionPortThreads);
            //Console.WriteLine($"{workerThreads} , {maxWorkerThreads}");
            if (workerThreads == maxWorkerThreads)
            { working = false; }
        }
        //when all threads are completed then 'working' will be false 
    }
    void xyz(object o)
    {
        console.writeline("");
    }
Jeffrey
  • 11,063
  • 1
  • 21
  • 42