2

I'm trying to get threads to wait for each other before preceding so they stay in sync.

In my actual program I have lots of IObjectObserved objects (on their own threads) sending out events and I want to keep everything in sync so an IObjectListener (on its own thread) can listen to one of these objects 50 times and then subscribe to another in time to catch its 51st event.

I haven't got that far yet, but I think synchronizing threads is the main problem. I'm managed to achieve this with two way signalling using AutoResetEvents. Is there not a better way to do this?

class Program
{
    static EventWaitHandle _ready = new AutoResetEvent(true);
    static EventWaitHandle _go = new AutoResetEvent(false);
    static EventWaitHandle _ready1 = new AutoResetEvent(true);
    static EventWaitHandle _go1 = new AutoResetEvent(false);
    static EventWaitHandle _ready2 = new AutoResetEvent(true);
    static EventWaitHandle _go2 = new AutoResetEvent(false);

    static void Main(string[] args)
    {
        new Thread(Waiter).Start();
        new Thread(Waiter1).Start();
        new Thread(Waiter2).Start();
        for (; ; )
        {
            _ready.WaitOne();
            _ready1.WaitOne();
            _ready2.WaitOne();
            Console.WriteLine("new round");
            _go.Set();
            _go1.Set();
            _go2.Set();
        }
    }

    static void Waiter()
    {
        for (; ; )
        {
            _go.WaitOne();
            Thread.Sleep(1000);
            Console.WriteLine("Waiter run");
            _ready.Set();
        }
    }
    static void Waiter1()
    {
        for (; ; )
        {
            _go1.WaitOne();
            Thread.Sleep(5000);
            Console.WriteLine("water1 run");
            _ready1.Set();
        }
    }
    static void Waiter2()
    {
        for (; ; )
        {
            _go2.WaitOne();
            Thread.Sleep(500);
            Console.WriteLine("water2 run");
            _ready2.Set();
        }
    }
}
marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
Nathan Cooper
  • 6,262
  • 4
  • 36
  • 75
  • 1
    I suggest you read 'Threaing in C#': http://www.albahari.com/threading/ – Kris Vandermotten Feb 23 '14 at 18:11
  • Why are you trying to do this? What's the problem you're trying to solve? Or is this just an academic exercise? – Enigmativity Feb 23 '14 at 23:11
  • Just making a note of links i need to investigate: [one](http://joeduffyblog.com/2005/05/11/scheduling-coroutines-with-grains/) [two](http://csharperimage.jeremylikness.com/2010/03/sequential-asynchronous-workflows-in.html) – Nathan Cooper Feb 24 '14 at 16:51
  • Windows itself dos support fibers ... Here is an MSDN article that goes into detail on how to use them from C#: http://msdn.microsoft.com/en-us/magazine/cc164086.aspx – Goz Feb 23 '14 at 18:09

3 Answers3

2

You could simplify things a bit.

  • You could use one CountdownEvent instead of waiting for 3 handles to be signaled. cde.Wait will block until it has been signaled 3 times.
  • You could also use a SemaphoreSlim to release multiple threads, instead of using 3 different handles. sem.Release(3) will unblock up to 3 threads blocked on sem.Wait().


static CountdownEvent cde = new CountdownEvent(3);
static SemaphoreSlim sem = new SemaphoreSlim(3);

static void X()
{
    new Thread(Waiter).Start();
    new Thread(Waiter).Start();
    new Thread(Waiter).Start();
    for (; ; )
    {
        cde.Wait();
        Debug.WriteLine("new round");

        cde.Reset(3);
        sem.Release(3);
    }
}

static void Waiter()
{
    for (; ; )
    {
        sem.Wait();
        Thread.Sleep(1000);
        Debug.WriteLine("Waiter run");
        cde.Signal();
    }
}

Note that now all threads can reuse the same code.


Edit:

As stated in the comments, one thread might steal another thread's round. If you don't want this to happen, a Barrier will do the job:

   private static Barrier _barrier;

   private static void SynchronizeThreeThreads()
   {
       _barrier = new Barrier(3, b =>
                                Debug.WriteLine("new round"));

       new Thread(Waiter).Start();
       new Thread(Waiter).Start();
       new Thread(Waiter).Start();

       //let the threads run for 5s
       Thread.Sleep(5000);
   }
   static void Waiter()
   {
       while(true)
       {
           Debug.WriteLine("Thread {0} done.", Thread.CurrentThread.ManagedThreadId);
           _barrier.SignalAndWait();
       }
   }

You add 3 participants to the barrier. The first and second participants to reach the barrier (i.e., to execute _barrier.SignalAndWait()) will block until the remaining participants reaches it too. When all participants have reached the barrier, they will all be released and go for another round.

Notice that I passed a lambda to the barrier constructor - that's the "post-phase action", an action that will be executed after all participants have reached the barrier, and before releasing them.

dcastro
  • 66,540
  • 21
  • 145
  • 155
  • Oh dear. Actually, using this code and removing the thread.sleep, after the sem is released all the its possible he second loop of one thread may pinch another threads spot. 3 threads will always run but it's non-deterministic. – Nathan Cooper Feb 27 '14 at 11:57
  • @NathanCooper That's true. If you don't want that to happen, try a `Barrier` instead. See my updated answer. – dcastro Feb 27 '14 at 13:17
  • "Enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases." Barrier. That sounds like a perfect fit. Thanks very much. – Nathan Cooper Feb 27 '14 at 13:32
1

You really should not be blocking threads for cooperative execution, where possible, especially if there're multiple threads involved.

Below is an implementation of your original logic (loops preserved) without blocking code, using async/await and custom awaiters. Custom awaiters can be very handy when implement coroutines like that.

using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        new Program().RunAsync().Wait();
    }

    async Task RunAsync()
    {
        var ready1 = new CoroutineEvent(initialState: true);
        var go1 = new CoroutineEvent(initialState: false);

        var ready2 = new CoroutineEvent(initialState: true);
        var go2 = new CoroutineEvent(initialState: false);

        var ready3 = new CoroutineEvent(initialState: true);
        var go3 = new CoroutineEvent(initialState: false);

        var waiter1 = Waiter(1, go1, ready1);
        var waiter2 = Waiter(2, go2, ready2);
        var waiter3 = Waiter(3, go3, ready3);

        while (true)
        {
            await ready1.WaitAsync();
            ready1.Reset();

            await ready2.WaitAsync();
            ready2.Reset();

            await ready3.WaitAsync();
            ready2.Reset();

            Console.WriteLine("new round");

            go1.Set();
            go2.Set();
            go3.Set();
        }
    }

    async Task Waiter(int n, CoroutineEvent go, CoroutineEvent ready)
    {
        while (true)
        {
            await go.WaitAsync();
            go.Reset();

            await Task.Delay(500).ConfigureAwait(false);
            Console.WriteLine("Waiter #" + n + " + run, thread: " + 
                Thread.CurrentThread.ManagedThreadId);

            ready.Set();
        }
    }

    public class CoroutineEvent
    {
        volatile bool _signalled;
        readonly Awaiter _awaiter;

        public CoroutineEvent(bool initialState = true)
        {
            _signalled = initialState;
            _awaiter = new Awaiter(this);
        }

        public bool IsSignalled { get { return _signalled; } }

        public void Reset()
        {
            _signalled = false;
        }

        public void Set()
        {
            var wasSignalled = _signalled;
            _signalled = true;
            if (!wasSignalled)
                _awaiter.Continue();
        }

        public Awaiter WaitAsync()
        {
            return _awaiter;
        }

        public class Awaiter: System.Runtime.CompilerServices.INotifyCompletion
        {
            volatile Action _continuation;
            readonly CoroutineEvent _owner;

            internal Awaiter(CoroutineEvent owner)
            {
                _owner = owner;
            }

            static void ScheduleContinuation(Action continuation)
            {
                ThreadPool.QueueUserWorkItem((state) => ((Action)state)(), continuation);
            }

            public void Continue()
            {
                lock (this)
                {
                    var continuation = _continuation;
                    _continuation = null;
                    if (continuation != null)
                        ScheduleContinuation(continuation);
                }
            }

            // custom Awaiter methods

            public Awaiter GetAwaiter()
            {
                return this;
            }

            public bool IsCompleted
            {
                get
                {
                    lock (this)
                        return _owner.IsSignalled;
                }
            }

            public void GetResult()
            {
            }

            // INotifyCompletion

            public void OnCompleted(Action continuation)
            {
                lock (this)
                {
                    if (_continuation != null)
                        throw new InvalidOperationException();

                    if (_owner.IsSignalled)
                        ScheduleContinuation(continuation);
                    else
                        _continuation = continuation;
                }
            }
        }
    }
}
noseratio
  • 59,932
  • 34
  • 208
  • 486
0

Firstly, thanks your help guys. I thought I would post my final solution here for completeness.

I used the Barrier approach, with some changes to alternately run (collections of) threads. This code swaps execution between the one type threads and the two type threads.

static void Main(string[] args)
{
    SynchronizeThreeThreads();
    Console.ReadKey();
}


private static Barrier oneBarrier;
private static Barrier twoBarrier;
private static EventWaitHandle TwoDone = new AutoResetEvent(false);
private static EventWaitHandle OneDone = new AutoResetEvent(false);

private static void SynchronizeThreeThreads()
    {
        //Barrier hand off to each other (other barrier's threads do work between set and waitone)
        //Except last time when twoBarrier does not wait for OneDone
        int runCount = 2;
        int count = 0;
        oneBarrier = new Barrier(0, (b) => { Console.WriteLine("one done"); OneDone.Set(); TwoDone.WaitOne(); Console.WriteLine("one starting"); });
        twoBarrier = new Barrier(0, (b) => { Console.WriteLine("two done"); TwoDone.Set(); count++; if (count != runCount) { OneDone.WaitOne(); Console.WriteLine("two starting"); } });

        //Create tasks sorted into two groups
        List<Task> oneTasks = new List<Task>() { new Task(() => One(runCount)), new Task(() => One(runCount)), new Task(() => One(runCount)) };
        List<Task> twoTasks = new List<Task>() { new Task(() => Two(runCount)), new Task(() => TwoAlt(runCount)) };
        oneBarrier.AddParticipants(oneTasks.Count);
        twoBarrier.AddParticipants(twoTasks.Count);

        //Start Tasks. Ensure oneBarrier does work before twoBarrier
        oneTasks.ForEach(task => task.Start());
        OneDone.WaitOne();
        twoTasks.ForEach(task => task.Start());

        //Wait for all Tasks to finish
        oneTasks.ForEach(task => task.Wait());
        twoTasks.ForEach(task => task.Wait());

        Console.WriteLine("done");

    }

    static void One(int runCount)
    {
        for (int i = 0; i <= runCount; i++)
        {

            Thread.Sleep(100);
            Console.WriteLine("One " + i.ToString());
            oneBarrier.SignalAndWait();
        }
    }
    static void Two(int runCount)
    {
        for (int i = 0; i <= runCount; i++)
        {

            Thread.Sleep(500);
            Console.WriteLine("Two " + i.ToString());
            twoBarrier.SignalAndWait();
        }
    }
    static void TwoAlt(int runCount)
    {
        for (int i = 0; i <= runCount; i++)
        {

            Thread.Sleep(10);
            Console.WriteLine("TwoAlt " + i.ToString());
            twoBarrier.SignalAndWait();
        }
    }
Nathan Cooper
  • 6,262
  • 4
  • 36
  • 75