1

I have been using the queued lock code from this answer, and wrote a unit test for it. For reference, the lock code:

public sealed class FifoMutex
{
    private readonly object innerLock = new object();
    private volatile int ticketsCount = 0;
    private volatile int ticketToRide = 1;
    private readonly ThreadLocal<int> reenter = new ThreadLocal<int>();

    public void Enter()
    {
        reenter.Value++;
        if (reenter.Value > 1)
            return;
        int myTicket = Interlocked.Increment(ref ticketsCount);
        Monitor.Enter(innerLock);
        while (true)
        {
            if (myTicket == ticketToRide)
            {
                return;
            }
            else
            {
                Monitor.Wait(innerLock);
            }
        }
    }

    public void Exit()
    {
        if (reenter.Value > 0)
            reenter.Value--;
        if (reenter.Value > 0)
            return;
        Interlocked.Increment(ref ticketToRide);
        Monitor.PulseAll(innerLock);
        Monitor.Exit(innerLock);
    }
}

And my testing code:

[TestClass]
public class FifoMutexTests
{
    public static ConcurrentQueue<string> Queue;

    [TestInitialize]
    public void Setup()
    {
        Queue = new ConcurrentQueue<string>();
    }

    [TestCleanup]
    public void TearDown()
    {
        Queue = null;
    }

    [TestMethod]
    public void TestFifoMutex()
    {
        int noOfThreads = 10;
        int[] threadSleepTimes = new int[noOfThreads];
        string[] threadNames = new string[noOfThreads];
        Random r = new Random();
        for (int i = 0; i < noOfThreads; i++)
        {
            threadSleepTimes[i] = r.Next(0, 250);
            threadNames[i] = "Thread " + i;
        }

        for (int i = 0; i < noOfThreads; i++)
        {
            FifoMutexTestUser user = new FifoMutexTestUser();
            Thread newThread = new Thread(user.DoWork);
            newThread.Name = threadNames[i];
            newThread.Start(threadSleepTimes[i]);
        }
        Thread.Sleep(3000);

        var receivedThreadNamesInOrder = Queue.ToArray();
        Assert.AreEqual(threadNames.Length, receivedThreadNamesInOrder.Length);
        for (int i = 0; i < receivedThreadNamesInOrder.Length; i++)
        {
            Assert.AreEqual(threadNames[i], receivedThreadNamesInOrder[i]);
        }
    }
}

Using this test mutex user:

public class FifoMutexTestUser
{
    private readonly static FifoMutex fifoMutex = new FifoMutex();

    public void DoWork(object sleepTime)
    {
        try
        {
            fifoMutex.Enter();
            Thread.Sleep((int)sleepTime);
            FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
        }
        finally
        {
            fifoMutex.Exit();
        }
    }
}

In essence, I am creating ten threads, each of them will sleep for a random period of time, and then they will enqueue their name in the static concurrent queue in the main test class. The threads are built from different instances of the same user class, which has a static fifo mutex property. That scenario resembles my own use case (I have multiple consumer classes receiving messages from different places, and I need my backend to handle them strictly sequentially, but also strictly in the order in which they arrived).

But this test doesn't work. All the threads enqueue all their names, but not in the right order. From the final for-loop in the second snippet, I read that they were in fact executed in a random order, which is exactly what the fifo mutex is meant to prevent.

But here's the thing. One minor adjustment to my testing code, and it all works like a charm.

        for (int i = 0; i < noOfThreads; i++)
        {
            FifoMutexTestUser user = new FifoMutexTestUser();
            Thread newThread = new Thread(user.DoWork);
            Thread.Sleep(1);
            newThread.Name = threadNames[i];
            newThread.Start(threadSleepTimes[i]);
        }

Now I am sleeping for one millisecond, the smallest possible interval, in the loop that starts all the threads (second loop from the second snippet). If I do this, then I all the threads enqueue their names in the right order, and my test succeeds 100% of the time.

So I wonder why such a tiny sleep period made a difference. I am not that knowledgeable regarding compilation, but my first guess was that the loop starting all the threads is being compiled or optimised by the compiler, and in that process the order of the threads changes?

Or, the (perhaps likelier) alternative, is my testing code (or the mutex code) at fault?

KeizerHarm
  • 330
  • 1
  • 4
  • 22
  • I wouldn't rely on a timer to solve this issue. I would add a sequential ID to the message and reorder when necessary on the receive end. There is no guarantee when messages are transmitted they take the same route and same delay. – jdweng Oct 21 '20 at 13:02
  • Have you considered using a built-in library like the [TPL Dataflow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) to handle message passing and order preservation, instead of building your own from scratch? – Theodor Zoulias Oct 21 '20 at 13:05
  • Thanks for the advice, but I'd rather not get into the specific use case for the mutex. Safe to say a variety of concurrency/messaging libraries are already being used and they work wonderfully individually, but this is one specific point in the flow of the application where I really need to synchronise different consumer threads so they are handled one at a time. – KeizerHarm Oct 21 '20 at 13:08
  • 1
    KeizerHarm yeap, if you have already invested on a custom concurrency/messaging mechanism that is tailored to your needs, it makes sense to maintain it and build on top of it, instead of switching to some generic library. – Theodor Zoulias Oct 21 '20 at 17:47

1 Answers1

2

It seems (if I understood issue correctly) that you assume threads will actually start (DoWork will execute in this case) and grab mutex in the same order you called Thread.Start on them. However, this is not (necessary) the case.

Say you have 10 threads (with "ids" from 1 to 10), and you then call Thead.Start on them in order - that does not mean they will actually start in that order. You call start on thread 1, then you call start on thread 2, and then it's possible that DoWork of thread 2 (not 1) executes first. You can observe this by changing your test code this way:

public class FifoMutexTestUser {
    private readonly int _id;
    public FifoMutexTestUser(int id) {
        _id = id;
    }
    private readonly static FifoMutex fifoMutex = new FifoMutex();

    public void DoWork(object sleepTime)
    {
        Console.WriteLine("Thread started: " + _id);
        try
        {
            fifoMutex.Enter();
            Thread.Sleep((int)sleepTime);
            FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
        }
        finally
        {
            fifoMutex.Exit();
        }
    }
}

And then passing loop variable there (which is corellated with your threadNames on which you perform assert):

for (int i = 0; i < noOfThreads; i++)
{
    FifoMutexTestUser user = new FifoMutexTestUser(i);
    Thread newThread = new Thread(user.DoWork);
    newThread.Name = threadNames[i];
    newThread.Start(threadSleepTimes[i]);
}

And you can see something like this (results may vary of course):

Thread started: 9
Thread started: 1
Thread started: 0
Thread started: 2
Thread started: 3
Thread started: 4
Thread started: 5
Thread started: 6
Thread started: 7
Thread started: 8

So in this run, thread you called Thread.Start on last actually started first. But even more - if thread has started first (by started we mean here DoWork started executing) - it doesn't mean it would grab the mutex first, since threads execute in parallel and code outside of fifoMutex.Enter and fifoMutex.Exit (and inside those functions before and after mutex is actually acquired\released) is not protected by any synchronization constructs - any of the threads can grab the mutex first.

Adding a delay sometimes (not always) kind of gives the advantage to previous (in the loop) thread so it has more chance to actually grab mutex first. If you were lucky so that threads attempted to grab mutex in the right order, then your FifoMutex ensures they will then unblock in that order. But the order in which they grab mutex is undetermined in your test code.

Evk
  • 98,527
  • 8
  • 141
  • 191
  • Aah, you're right... I suppose there's no way to 100% reliably test the fifomutex then, unless I use really long waiting periods. – KeizerHarm Oct 26 '20 at 12:54
  • @KeizerHarm Yes that's quite non-trivial to test it, at least nothing comes to mind right now. However, for your requirements ("I have multiple consumer classes receiving messages from different places, and I need my backend to handle them strictly sequentially, but also strictly in the order in which they arrived") - why not just put them into (concurrent) queue? Then another thread will process them in order they are enqueued. To see when it's completed in the thread which enqueued message - include signaling event (ManualResetEvent) and wait on it. Queue processor will set it when done. – Evk Oct 26 '20 at 13:17
  • Will look into that. Thanks for the advice! – KeizerHarm Oct 26 '20 at 13:19
  • @KeizerHarm I can include some sample code for this queue approach if that's not clear from vague description above. – Evk Oct 26 '20 at 13:19
  • Thanks but I will sort that out; I think I understand it. My use case has gotten more complicated in the meantime anyway. – KeizerHarm Oct 26 '20 at 13:21
  • Will get you your bounty in 22 hours, when I am allowed to :D – KeizerHarm Oct 26 '20 at 13:21