4

I am learning about writing concurrent data structures and looking at the ConcurrentStack implementation as a learning exercise. As a starting point I have created a copy of the ConcurrentStack implementation by using IlSpy to decompile it into C#. I have limited myself to investigating and using just the Push and TryPop methods for the time being.

But my implementation is significantly slower than using the original.

My testing uses 4 threads (on a single socket, 4 core CPU) with thread affinity for each thread against a different core. Each thread performs 1,000,000 loops and each loop does three pushes and three pops. Running the testing many times the average time to complete all the threads is...

  • ConcurrentStack => 445ms
  • Clone of Push/TryPop => 670ms

So even though the code, as far as I can tell, is identical between the two the clone is about 50% slower. I run the testing 500 times in a run and take the average over all the runs. So I do not believe the issue is the initial JIT'ing of the code.

Any ideas why a copy of the methods would be so much slower?

C# Implementation

(For the sake of completeness I have provided the C# console app code that can be used to replicate the result. For anyone interesting in seeing if they get the same result as me.)

class Program
{
    static void Main(string[] args)
    {
        int processors = Environment.ProcessorCount;
        Console.WriteLine("Processors: {0}", processors);

        List<Type> runnersT = new List<Type>() { typeof(ThreadRunnerConcurrent), 
                                                 typeof(ThreadRunnerCASStack)};
        int cycles = 500;
        foreach (Type runnerT in runnersT)
        {
            long total = 0;
            for (int i = 0; i < cycles; i++)
            {
                // Create a thread runner per processor
                List<ThreadRunner> runners = new List<ThreadRunner>();
                for (int j = 0; j < processors; j++)
                {
                    ThreadRunner runner = Activator.CreateInstance(runnerT) as ThreadRunner;
                    runner.Processor = j;
                    runners.Add(runner);
                }

                // Start each runner going
                Stopwatch sw = new Stopwatch();
                sw.Start();
                runners.ForEach((r) => r.Start());

                // Wait for all the runners to exit
                runners.ForEach((r) => r.Join());
                runners.ForEach((r) => r.Check());
                sw.Stop();

                total += sw.ElapsedMilliseconds;
            }

            Console.WriteLine("{0} Average: {1}ms", runnerT.Name, (total / cycles));
        }

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

abstract class ThreadRunner
{
    private int _processor;
    private Thread _thread;

    public ThreadRunner()
    {
    }

    public int Processor
    {
        get { return _processor; }
        set { _processor = value; }
    }

    public void Start()
    {
        _thread = new Thread(new ParameterizedThreadStart(Run));
        _thread.Start();
    }

    public void Join()
    {
        _thread.Join();
    }

    public abstract void Check();

    protected abstract void Run(int cycles);

    private void Run(object param)
    {
        SetAffinity();
        Run(1000000);
    }

    private void SetAffinity()
    {
        #pragma warning disable 618
        int osThreadId = AppDomain.GetCurrentThreadId();
        #pragma warning restore 618

        // Set the thread's processor affinity
        ProcessThread thread = Process.GetCurrentProcess().Threads.Cast<ProcessThread>().Where(t => t.Id == osThreadId).Single();
        thread.ProcessorAffinity = new IntPtr(1L << Processor);
    }
}

class ThreadRunnerConcurrent : ThreadRunner
{
    private static ConcurrentStack<int> _stack = new ConcurrentStack<int>();

    protected override void Run(int cycles)
    {
        int ret;
        for (int i = 0; i < cycles; i++)
        {
            _stack.Push(i);
            _stack.Push(i);
            while (!_stack.TryPop(out ret)) ;
            _stack.Push(i);
            while (!_stack.TryPop(out ret)) ;
            while (!_stack.TryPop(out ret)) ;
        }
    }

    public override void Check()
    {
        if (_stack.Count > 0)
            Console.WriteLine("ThreadRunnerConcurrent has entries!");
    }
}

class ThreadRunnerCASStack : ThreadRunner
{
    private static CASStack<int> _stack = new CASStack<int>();

    protected override void Run(int cycles)
    {
        int ret;
        for (int i = 0; i < cycles; i++)
        {
            _stack.Push(i);
            _stack.Push(i);
            while (!_stack.TryPop(out ret)) ;
            _stack.Push(i);
            while (!_stack.TryPop(out ret)) ;
            while (!_stack.TryPop(out ret)) ;
        }
    }

    public override void Check()
    {
        if (_stack.Count > 0)
            Console.WriteLine("ThreadRunnerCASStack has entries!");
    }
}

class CASStack<T>
{
    private class Node
    {
        internal readonly T m_value;
        internal CASStack<T>.Node m_next;
        internal Node(T value)
        {
            this.m_value = value;
            this.m_next = null;
        }
    }

    private volatile CASStack<T>.Node m_head;

    public void Push(T item)
    {
        CASStack<T>.Node node = new CASStack<T>.Node(item);
        node.m_next = this.m_head;

        if (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, node, node.m_next) == node.m_next)
            return;

        PushCore(node, node);
    }

    private void PushCore(Node head, Node tail)
    {
        SpinWait spinWait = default(SpinWait);

        do
        {
            spinWait.SpinOnce();
            tail.m_next = this.m_head;
        }
        while (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, head, tail.m_next) != tail.m_next);
    }

    public bool TryPop(out T result)
    {
        CASStack<T>.Node head = this.m_head;

        if (head == null)
        {
            result = default(T);
            return false;
        }

        if (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, head.m_next, head) == head)
        {
            result = head.m_value;
            return true;
        }

        return TryPopCore(out result);
    }

    private bool TryPopCore(out T result)
    {
        CASStack<T>.Node node;
        if (TryPopCore(1, out node) == 1)
        {
            result = node.m_value;
            return true;
        }
        result = default(T);
        return false;
    }

    private int TryPopCore(int count, out CASStack<T>.Node poppedHead)
    {
        SpinWait spinWait = default(SpinWait);
        int num = 1;
        Random random = new Random(Environment.TickCount & 2147483647);
        CASStack<T>.Node head;
        int num2;
        while (true)
        {
            head = this.m_head;
            if (head == null)
                break;

            CASStack<T>.Node node = head;
            num2 = 1;
            while (num2 < count && node.m_next != null)
            {
                node = node.m_next;
                num2++;
            }

            if (Interlocked.CompareExchange<CASStack<T>.Node>(ref this.m_head, node.m_next, head) == head)
                goto Block_5;

            for (int i = 0; i < num; i++)
                spinWait.SpinOnce();

            num = (spinWait.NextSpinWillYield ? random.Next(1, 8) : (num * 2));
        }
        poppedHead = null;
        return 0;
    Block_5:
        poppedHead = head;
        return num2;
    }
}
#endregion
JasonMArcher
  • 14,195
  • 22
  • 56
  • 52
Phil Wright
  • 22,580
  • 14
  • 83
  • 137
  • 2
    stupid question, but did you compile for Release? – Mitch Wheat Feb 26 '15 at 00:57
  • Good point. That improves things. Now the clone is taking 550ms which is ~30% slower instead of ~50% slower. – Phil Wright Feb 26 '15 at 01:11
  • 1
    I think, IlSpy decompiled code differ from original. If you try to decompile with another tool (for example R#), you will see, that code will differ from ILSpy – oakio Feb 26 '15 at 10:30
  • 1
    Use the [actual](http://referencesource.microsoft.com/) source code for the type, not some tools best guess at what it might be. Decompilers are debugging tools, not ways of flawlessly recreating source code, treat them as such. – Servy Feb 26 '15 at 21:50
  • Thanks for the link. I changed my code to be exactly the same as the original source code and it still has the same slowdown. – Phil Wright Feb 26 '15 at 22:10
  • The code you posted does not compile. `CASStack` does not implement a property `Count`, yet it is referenced via `ThreadRunnerCASStack.Check()`. – antiduh Feb 26 '15 at 22:27

1 Answers1

6

ConcurrentStack<T> has one advantage that your CASStack<T> doesn't have, even though the code for both is identical.

ConcurrentStack<T> has an ngen'd native image installed on your computer that was compiled when you installed your .Net framework install. Your CASStack<T> is being compiled via JIT, and because JIT has to be fast, it does not perform as many optimizations as the AOT compiler in ngen.

I tested your code on my computer. Without ngen'ing your image, I got these results:

Processors: 4
ThreadRunnerConcurrent Average: 764ms
ThreadRunnerCASStack Average: 948ms
Finished

After ngening:

Processors: 4
ThreadRunnerConcurrent Average: 778ms
ThreadRunnerCASStack Average: 742ms
Finished
antiduh
  • 11,853
  • 4
  • 43
  • 66