2

I have an endpoint in a API that process data and uses a lock. As there are many user using the system, sometimes the requests are not processed in order and may throw a timeout if there are many requests waiting. We already tried two different approaches, the first one is this class:

public class QueuedFunctions<T>
    {
        private readonly object _internalSyncronizer = new object();
        private readonly ConcurrentQueue<Func<T>> _funcQueue = new ConcurrentQueue<Func<T>>();

        public T Execute(Func<T> func)
        {
            _funcQueue.Enqueue(func);
            Console.WriteLine("Queuing: " + Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine(Monitor.IsEntered(_internalSyncronizer));
            lock (_internalSyncronizer)
            {
                Console.WriteLine("Locked: " + Thread.CurrentThread.ManagedThreadId);
                Func<T> nextFunc;
                if (_funcQueue.TryDequeue(out nextFunc))
                {
                    return nextFunc();
                }
                else
                {
                    throw new Exception("Something is wrong. How come there is nothing in the queue?");
                }
            }
        }
    }

I tested this with the following unit test:

var threadsQuantity = 50;

            for (int i = 0; i < threadsQuantity; i++)
            {
                var t = new Thread(async () =>
                {
                    Console.WriteLine("Started Thread: " + Thread.CurrentThread.ManagedThreadId);
                    await queuedActions.Execute(() =>
                    {
                        Console.WriteLine("Thread: " + Thread.CurrentThread.ManagedThreadId);
                        return Task.FromResult<object>(null);
                    });
                });

                t.Start();
            }

And the output is the following

Started Thread: 16
Started Thread: 13
Started Thread: 15
Started Thread: 17
Started Thread: 14
Started Thread: 18
Started Thread: 19
Started Thread: 20
Started Thread: 21
Queuing: 21
Locked: 21
Queuing: 13
Queuing: 18
Queuing: 15
Queuing: 16
Queuing: 17
Queuing: 14
Queuing: 19
Queuing: 20
Started Thread: 22
Queuing: 22
Thread: 21
Locked: 13
Thread: 13
Locked: 15
Started Thread: 23
Thread: 15
Queuing: 23
Locked: 23
Thread: 23
Locked: 18
Thread: 18
Locked: 17
Thread: 17
Locked: 16
Thread: 16
Locked: 14
Thread: 14
Locked: 19
Thread: 19
Locked: 20
Thread: 20
Locked: 22
Thread: 22
Started Thread: 24
Queuing: 24
Locked: 24
Thread: 24
Started Thread: 25
Queuing: 25
Locked: 25
Thread: 25
Started Thread: 26
Queuing: 26
Locked: 26
Thread: 26
Started Thread: 27
Queuing: 27
Locked: 27
Thread: 27
Started Thread: 28
Queuing: 28
Locked: 28
Thread: 28
Started Thread: 29
Queuing: 29
Locked: 29
Thread: 29
Started Thread: 30
Queuing: 30
Locked: 30
Thread: 30
Started Thread: 31
Queuing: 31
Locked: 31
Thread: 31
Started Thread: 32
Queuing: 32
Locked: 32
Thread: 32
Started Thread: 33
Queuing: 33
Locked: 33
Thread: 33
Started Thread: 34
Queuing: 34
Locked: 34
Thread: 34
Started Thread: 35
Queuing: 35
Locked: 35
Thread: 35
Started Thread: 36
Queuing: 36
Locked: 36
Thread: 36
Started Thread: 37
Queuing: 37
Locked: 37
Thread: 37
Started Thread: 38
Queuing: 38
Locked: 38
Thread: 38
Started Thread: 39
Queuing: 39
Locked: 39
Thread: 39
Started Thread: 40
Queuing: 40
Locked: 40
Thread: 40
Started Thread: 41
Queuing: 41
Locked: 41
Thread: 41
Started Thread: 42
Queuing: 42
Locked: 42
Thread: 42
Started Thread: 43
Queuing: 43
Locked: 43
Thread: 43
Started Thread: 44
Queuing: 44
Locked: 44
Thread: 44
Started Thread: 45
Queuing: 45
Locked: 45
Thread: 45
Started Thread: 46
Queuing: 46
Locked: 46
Thread: 46
Started Thread: 47
Queuing: 47
Locked: 47
Thread: 47
Started Thread: 48
Queuing: 48
Locked: 48
Thread: 48
Started Thread: 49
Queuing: 49
Locked: 49
Thread: 49
Started Thread: 50
Queuing: 50
Locked: 50
Thread: 50
Started Thread: 51
Queuing: 51
Locked: 51
Thread: 51
Started Thread: 52
Queuing: 52
Locked: 52
Thread: 52
Started Thread: 53
Queuing: 53
Locked: 53
Thread: 53
Started Thread: 54
Queuing: 54
Locked: 54
Thread: 54
Started Thread: 55
Queuing: 55
Locked: 55
Thread: 55
Started Thread: 56
Queuing: 56
Locked: 56
Thread: 56
Started Thread: 57
Queuing: 57
Locked: 57
Thread: 57
Started Thread: 58
Queuing: 58
Locked: 58
Thread: 58
Started Thread: 59
Queuing: 59
Locked: 59
Thread: 59
Started Thread: 60
Queuing: 60
Locked: 60
Thread: 60
Started Thread: 61
Queuing: 61
Locked: 61
Thread: 61
Started Thread: 62
Queuing: 62
Locked: 62
Thread: 62

As you can see, the code queued the thread 21, locked the thread 21, then queued the thread 13, locked the thread 13, queued the thread 18 and then locked the thread 15, which was the next thread to process.

Moreover, we also tested with another class that we found on this forums:

   public sealed class QueuedLock
{
    private class SyncObject : IDisposable
    {
        private Action m_action = null;
        private static readonly object locker = new object();

        public SyncObject(Action action)
        {
            m_action = action;
        }

        public void Dispose()
        {
            lock (locker)
            {
                var action = m_action;
                m_action = null;
                action?.Invoke();
            }
        }
    }

    private static readonly object m_innerLock = new Object();
    private volatile int m_ticketsCount = 0;
    private volatile int m_ticketToRide = 1;

    public bool Enter()
    {
        if (Monitor.IsEntered(m_innerLock))
        {
            return false;
        }

        var myTicket = Interlocked.Increment(ref m_ticketsCount);
        Monitor.Enter(m_innerLock);
        Console.WriteLine("Locked: " + Thread.CurrentThread.ManagedThreadId);
        while (true)
        {
            if (myTicket == m_ticketToRide)
            {
                return true;
            }

            Monitor.Wait(m_innerLock);
        }
    }

    public void Exit()
    {
        Interlocked.Increment(ref m_ticketToRide);
        Monitor.PulseAll(m_innerLock);
        Monitor.Exit(m_innerLock);
    }

    public IDisposable GetLock()
    {
        if (Enter())
        {
            return new SyncObject(Exit);
        }

        return new SyncObject(null);
    }
}

The test method:

static QueuedLock queuedLock = new QueuedLock();
        [TestMethod]
        public void QueuedLockTest_ShouldRunInOrder()
        {
            var threadsQuantity = 50;

            for (int i = 0; i < threadsQuantity; i++)
            {
                var t = new Thread(() =>
                {
                    Console.WriteLine("Started Thread: " + Thread.CurrentThread.ManagedThreadId);
                    try
                    {
                        queuedLock.Enter();
                        Console.WriteLine("Thread: " + Thread.CurrentThread.ManagedThreadId);
                    }
                    finally
                    {
                        queuedLock.Exit();
                    }
                });

                t.Start();
            }
        }

And the output isn't ordered neither:

    Started Thread: 15
Started Thread: 14
Started Thread: 13
Started Thread: 16
Started Thread: 17
Started Thread: 18
Locked: 18
Started Thread: 19
Thread: 18
Locked: 15
Thread: 15
Locked: 14
Locked: 13
Locked: 19
Thread: 19
Locked: 16
Thread: 14
Locked: 17
Thread: 13
Thread: 16
Thread: 17
Started Thread: 20
Locked: 20
Thread: 20
Started Thread: 21
Locked: 21
Thread: 21
Started Thread: 22
Locked: 22
Thread: 22
Started Thread: 23
Locked: 23
Thread: 23
Started Thread: 24
Locked: 24
Thread: 24
Started Thread: 25
Locked: 25
Thread: 25
Started Thread: 26
Locked: 26
Thread: 26
Started Thread: 27
Locked: 27
Thread: 27
Started Thread: 28
Locked: 28
Thread: 28
Started Thread: 29
Locked: 29
Thread: 29
Started Thread: 30
Locked: 30
Thread: 30
Started Thread: 31
Locked: 31
Thread: 31
Started Thread: 32
Locked: 32
Thread: 32
Started Thread: 33
Locked: 33
Thread: 33
Started Thread: 34
Locked: 34
Thread: 34
Started Thread: 35
Locked: 35
Thread: 35
Started Thread: 36
Locked: 36
Thread: 36
Started Thread: 37
Locked: 37
Thread: 37
Started Thread: 38
Locked: 38
Thread: 38
Started Thread: 39
Locked: 39
Thread: 39
Started Thread: 40
Locked: 40
Thread: 40
Started Thread: 41
Locked: 41
Thread: 41
Started Thread: 42
Locked: 42
Thread: 42
Started Thread: 43
Locked: 43
Thread: 43
Started Thread: 44
Locked: 44
Thread: 44
Started Thread: 45
Locked: 45
Thread: 45
Started Thread: 46
Locked: 46
Thread: 46
Started Thread: 47
Locked: 47
Thread: 47
Started Thread: 48
Locked: 48
Thread: 48
Started Thread: 49
Locked: 49
Thread: 49
Started Thread: 50
Locked: 50
Thread: 50
Started Thread: 51
Locked: 51
Thread: 51
Started Thread: 52
Locked: 52
Thread: 52
Started Thread: 53
Locked: 53
Thread: 53
Started Thread: 54
Locked: 54
Thread: 54
Started Thread: 55
Locked: 55
Thread: 55
Started Thread: 56
Locked: 56
Thread: 56
Started Thread: 57
Locked: 57
Thread: 57
Started Thread: 58
Locked: 58
Thread: 58
Started Thread: 59
Locked: 59
Thread: 59
Started Thread: 60
Locked: 60
Thread: 60
Started Thread: 61
Locked: 61
Thread: 61
Started Thread: 62
Locked: 62
Thread: 62

Can you help me to make it work? I can't spot the error in those functions, and also it seems to work for other people.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
dragnash
  • 97
  • 4
  • So you want all invocations of the `func` to be serialized? No concurrency is allowed? – Theodor Zoulias Oct 27 '21 at 00:03
  • Is this helpful? [Task sequencing and re-entracy](https://stackoverflow.com/questions/21424084/task-sequencing-and-re-entracy) – Theodor Zoulias Oct 27 '21 at 00:04
  • Hi @TheodorZoulias, thanks for answer! I need all the requests to that endpoint to be processed in order of entrance (FIFO), and using a lock – dragnash Oct 27 '21 at 03:20
  • 2
    This: `new Thread(async () =>` doesn't seem right. You shouldn't use async methods in the Thread object like this, it will likely not behave as you expect. Only the initial part up until the first `await` will run on that thread, the rest will likely run on the ThreadPool or at least be handled by a SynchronizationContext. – Lasse V. Karlsen Oct 27 '21 at 07:44

1 Answers1

1

It seems that your tests do not generate usable info for proving or disproving the correct order of execution. The Thread.CurrentThread.ManagedThreadId is just an ID, and not a reliable indication of order. Also writing in the Console while the program is running may affect the order of execution, because all access to the static Console class is synchronized.

The test below uses the custom QueuedLock class (modified) to ensure the order of execution, and reports FIFO behavior consistently. I've added a Thread.Sleep(20) before starting each thread, to avoid having two threads requesting a ticket at roughly the same time. There is also a Thread.Sleep(40) inside the ThreadStart delegate, to ensure that all threads will get blocked before entering the QueuedLock (apart from the first thread of each batch).

public static void Main()
{
    QueuedLock queuedLock = new QueuedLock();
    foreach (var batch in Enumerable.Range(1, 10))
    {
        var sequence = Enumerable.Range(1, 10);
        var tickets = new List<int>();
        var threads = sequence.Select(item =>
        {
            Thread.Sleep(20);
            var thread = new Thread(() =>
            {
                queuedLock.Enter();
                try { tickets.Add(item); Thread.Sleep(40); }
                finally { queuedLock.Exit(); }
            });
            thread.Start();
            return thread;
        }).ToArray();
        foreach (var thread in threads) thread.Join();
        Console.WriteLine($"Batch #{batch}, tickets: {String.Join(", ", tickets)} - "
            + (tickets.SequenceEqual(sequence) ? "OK" : "NOT FIFO!"));
    }
}

public class QueuedLock
{
    private readonly object _locker = new object();
    private int _ticketsCount = 0;
    private int _ticketToRide = 1;

    public void Enter()
    {
        if (Monitor.IsEntered(_locker)) throw new InvalidOperationException();
        int myTicket = Interlocked.Increment(ref _ticketsCount);
        Monitor.Enter(_locker);
        while (myTicket != _ticketToRide) Monitor.Wait(_locker);
    }

    public void Exit()
    {
        if (!Monitor.IsEntered(_locker)) throw new InvalidOperationException();
        _ticketToRide++;
        Monitor.PulseAll(_locker);
        Monitor.Exit(_locker);
    }
}

Output:

Batch #1, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #2, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #3, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #4, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #5, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #6, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #7, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #8, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #9, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #10, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK

Try it on Fiddle.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thank you very much for replying! I tested it in the console and logged everything ok! Another question is if the QueuedLock queuedLock = new QueuedLock(); object needs to be static running in an API to be shared between all the controllers – dragnash Oct 27 '21 at 12:41
  • 1
    @GinoColaiacovo you could use the `QueuedLock` the same way you would use a locker object used by the `lock` statement. If the mutual exclusion policy that you want to enforce is application-wide, it makes sense to declare a single `static` instance of the `QueuedLock` class. – Theodor Zoulias Oct 27 '21 at 12:54