7

For some reasons I have to stick to .NET 3.5 and I need a functionality of Barrier class from .NET 4. I have a bunch of threads that do some work and I want them to wait for each other until all are done. When all are done I want that they do the job again and again in the similar manner. Encouraged by the thread Difference between Barrier in C# 4.0 and WaitHandle in C# 3.0? I have decided to implement the Barrier functionality with AutoResetEvent and WaitHandle classes. Altough I encounter a problem with my code:

class Program
{
    const int numOfThreads = 3;

    static AutoResetEvent[] barrier = new AutoResetEvent[numOfThreads];
    static Random random = new Random(System.DateTime.Now.Millisecond);

    static void barriers2(object barrierObj)
    {
        AutoResetEvent[] barrierLocal = (AutoResetEvent[])barrierObj;
        string name = Thread.CurrentThread.Name;
        for (int i = 0; i < 10; i++)
        {
            int sleepTime = random.Next(2000, 10000);
            System.Console.Out.WriteLine("Thread {0} at the 'barrier' will sleep for {1}.", name, sleepTime);
            Thread.Sleep(sleepTime);
            System.Console.Out.WriteLine("Thread {0} at the 'barrier' with time {1}.", name, sleepTime);
            int currentId = Convert.ToInt32(name);
            //for(int z = 0; z < numOfThreads; z++)
                barrierLocal[currentId].Set();
            WaitHandle.WaitAll(barrier);
            /*
            for (int k = 0; k < numOfThreads; k++)
            {
                if (k == currentId)
                {
                    continue;
                }
                System.Console.Out.WriteLine("Thread {0} is about to wait for the singla from thread: {1}", name, k);
                barrierLocal[k].WaitOne();
                System.Console.Out.WriteLine("Thread {0} is about to wait for the singla from thread: {1}. done", name, k);
            }
            */
        }
    }

    static void Main(string[] args)
    {
        for (int i = 0; i < numOfThreads; i++)
        {
            barrier[i] = new AutoResetEvent(false);
        }
        for (int i = 0; i < numOfThreads; i++)
        {
            Thread t = new Thread(Program.barriers2);
            t.Name = Convert.ToString(i);
            t.Start(barrier);
        }
    }
}

The output I receive is as follows:

Thread 0 at the 'barrier' will sleep for 7564 Thread 1 at the 'barrier' will sleep for 5123 Thread 2 at the 'barrier' will sleep for 4237 Thread 2 at the 'barrier' with time 4237 Thread 1 at the 'barrier' with time 5123 Thread 0 at the 'barrier' with time 7564 Thread 0 at the 'barrier' will sleep for 8641 Thread 0 at the 'barrier' with time 8641

And that's it. After the last line there is no more output and the app does not terminate. It looks like there is some sort of deadlock. However can not find the issue. Any help welcome.

Thanks!

Community
  • 1
  • 1
Kiter
  • 71
  • 2

3 Answers3

5

That's because you use AutoResetEvent. One of the thread's WaitAll() call is going to complete first. Which automatically causes Reset() on all the AREs. Which prevents the other threads from ever completing their WaitAll() calls.

A ManualResetEvent is required here.

Hans Passant
  • 922,412
  • 146
  • 1,693
  • 2,536
  • Hans, thank you for the answer. That explains the problem but in case of ManualResetEvent class there must some thread that resets the MREs after all threads pass a 'barrier'. Any idea how it could be done in my scenario? – Kiter Jul 31 '11 at 20:47
  • Get some inspiration from reading the Barrier source code with, say, Reflector. I think it uses two sets of MREs (named odd and even) and alternates between them. – Hans Passant Jul 31 '11 at 20:58
  • what do you think about a simple solution like this:
     
    lock (obj)
                    {
                        threadsCount++;
                        if (threadsCount == numOfThreads)
                        {
                            System.Console.WriteLine("All threads are done.");
                            threadsCount = 0;
                            Monitor.PulseAll(obj);
    
                        }
                        else
                        {
                            Monitor.Wait(obj);
                        }
                    }
    – Kiter Aug 01 '11 at 20:13
2

Download the Reactive Extensions backport for .NET 3.5. You will find the Barrier class along with the other useful concurrent data structures and synchronization mechanisms that were released in .NET 4.0.

Brian Gideon
  • 47,849
  • 13
  • 107
  • 150
  • hi, thanks for the answer. Is it possible to get just the Barrier class from this backport or I always have to reference the whole package? Thanks. – Kiter Aug 01 '11 at 09:14
  • the thing is that, it's for a purpose of a large project and can't force many others working on it to install the extenstion + on the production environment. – Kiter Aug 01 '11 at 13:57
  • 1
    You do not have to install the RX framework on the target machines. In fact, you don't have to install it on your development machines if you don't want to. Just grab the System.Threading.dll library that comes with it and reference it like you would for any other 3rd party library. That's what I do. – Brian Gideon Aug 01 '11 at 15:07
2

Here is my implementation I use for my XNA game. Barrier was not available when I wrote this, and I am still stuck with .Net 3.5. It requires three sets of ManualResetEvents, and a counter array to keep phase.

using System;
using System.Threading;

namespace Colin.Threading
{
    /// <summary>
    /// Threading primitive for "barrier" sync, where N threads must stop at certain points 
    /// and wait for all their bretheren before continuing.
    /// </summary>
    public sealed class NThreadGate
    {
        public int mNumThreads;
        private ManualResetEvent[] mEventsA;
        private ManualResetEvent[] mEventsB;
        private ManualResetEvent[] mEventsC;
        private ManualResetEvent[] mEventsBootStrap;
        private Object mLockObject;
        private int[] mCounter;
        private int mCurrentThreadIndex = 0;

        public NThreadGate(int numThreads)
        {
            this.mNumThreads = numThreads;

            this.mEventsA = new ManualResetEvent[this.mNumThreads];
            this.mEventsB = new ManualResetEvent[this.mNumThreads];
            this.mEventsC = new ManualResetEvent[this.mNumThreads];
            this.mEventsBootStrap = new ManualResetEvent[this.mNumThreads];
            this.mCounter = new int[this.mNumThreads];
            this.mLockObject = new Object();

            for (int i = 0; i < this.mNumThreads; i++)
            {
                this.mEventsA[i] = new ManualResetEvent(false);
                this.mEventsB[i] = new ManualResetEvent(false);
                this.mEventsC[i] = new ManualResetEvent(false);
                this.mEventsBootStrap[i] = new ManualResetEvent(false);
                this.mCounter[i] = 0;
            }
        }

        /// <summary>
        /// Adds a new thread to the gate system.
        /// </summary>
        /// <returns>Returns a thread ID for this thread, to be used later when waiting.</returns>
        public int AddThread()
        {
            lock (this.mLockObject)
            {
                this.mEventsBootStrap[this.mCurrentThreadIndex].Set();
                this.mCurrentThreadIndex++;
                return this.mCurrentThreadIndex - 1;
            }
        }

        /// <summary>
        /// Stop here and wait for all the other threads in the NThreadGate. When all the threads have arrived at this call, they
        /// will unblock and continue.
        /// </summary>
        /// <param name="myThreadID">The thread ID of the caller</param>
        public void WaitForOtherThreads(int myThreadID)
        {
            // Make sure all the threads are ready.
            WaitHandle.WaitAll(this.mEventsBootStrap);

            // Rotate between three phases.
            int phase = this.mCounter[myThreadID];
            if (phase == 0)        // Flip
            {
                this.mEventsA[myThreadID].Set();
                WaitHandle.WaitAll(this.mEventsA);
                this.mEventsC[myThreadID].Reset();
            }
            else if (phase == 1)    // Flop
            {
                this.mEventsB[myThreadID].Set();
                WaitHandle.WaitAll(this.mEventsB);
                this.mEventsA[myThreadID].Reset();
            }
            else    // Floop
            {
                this.mEventsC[myThreadID].Set();
                WaitHandle.WaitAll(this.mEventsC);
                this.mEventsB[myThreadID].Reset();
                this.mCounter[myThreadID] = 0;
                return;
            }

            this.mCounter[myThreadID]++;
        }
    }
}

Setting up the thread gate:

private void SetupThreads()
{
    // Make an NThreadGate for N threads.
    this.mMyThreadGate = new NThreadGate(Environment.ProcessorCount);

    // Make some threads...
    // e.g. new Thread(new ThreadStart(this.DoWork);
}

Thread worker method:

private void DoWork()
{
    int localThreadID = this.mMyThreadGate.AddThread();

    while (this.WeAreStillRunning)
    {
        // Signal this thread as waiting at the barrier
        this.mMyThreadGate.WaitForOtherThreads(localThreadID);

        // Synchronized work here...

        // Signal this thread as waiting at the barrier
        this.mMyThreadGate.WaitForOtherThreads(localThreadID);

        // Synchronized work here...

        // Signal this thread as waiting at the barrier
        this.mMyThreadGate.WaitForOtherThreads(localThreadID);
    }
}