1

I have thread pool implementation where whenever I try to stop/join the pool there is always one random thread in the pool that will not stop (state == Running) when I call Stop() on the pool.

I cannot see why, I only have one lock, I notify whoever might be blocked waiting for Dequeue with Monitor.PulseAll in Stop. The debugger clearly shows most of them got the message, it is just always 1 out of N that is still running...

Here is a minimal implementation of the pool

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace MultiThreading
{
    public class WorkerHub
    {
        private readonly object _listMutex = new object();
        private readonly Queue<TaskWrapper> _taskQueue;
        private readonly List<Thread> _threads;
        private int _runCondition;
        private readonly Dictionary<string, int> _statistics;

        public WorkerHub(int count = 4)
        {
            _statistics = new Dictionary<string, int>();
            _taskQueue = new Queue<TaskWrapper>();
            _threads = new List<Thread>();
            InitializeThreads(count);
        }

        private bool ShouldRun
        {
            get => Interlocked.CompareExchange(ref _runCondition, 1, 1) == 1;
            set
            {
                if (value)
                    Interlocked.CompareExchange(ref _runCondition, 1, 0);
                else
                    Interlocked.CompareExchange(ref _runCondition, 0, 1);
            }
        }

        private void InitializeThreads(int count)
        {
            Action threadHandler = () =>
            {
                while (ShouldRun)
                {
                    var wrapper = Dequeue();
                    if (wrapper != null)
                    {
                        wrapper.FunctionBinding.Invoke();
                        _statistics[Thread.CurrentThread.Name] += 1;
                    }
                }
            };

            for (var i = 0; i < count; ++i)
            {
                var t = new Thread(() => { threadHandler.Invoke(); });
                t.Name = $"WorkerHub Thread#{i}";
                _statistics[t.Name] = 0;
                _threads.Add(t);
            }
        }


        public Task Enqueue(Action work)
        {
            var tcs = new TaskCompletionSource<bool>();
            var wrapper = new TaskWrapper();

            Action workInvoker = () =>
            {
                try
                {
                    work.Invoke();
                    tcs.TrySetResult(true);
                }
                catch (Exception e)
                {
                    tcs.TrySetException(e);
                }
            };
            Action workCanceler = () => { tcs.TrySetCanceled(); };
            wrapper.FunctionBinding = workInvoker;
            wrapper.CancelBinding = workCanceler;


            lock (_taskQueue)
            {
                _taskQueue.Enqueue(wrapper);
                Monitor.PulseAll(_taskQueue);
            }


            return tcs.Task;
        }

        private TaskWrapper Dequeue()
        {
            lock (_listMutex)
            {
                while (_taskQueue.Count == 0)
                {
                    if (!ShouldRun)
                        return null;
                    Monitor.Wait(_listMutex);
                }

                _taskQueue.TryDequeue(out var wrapper);
                return wrapper;
            }
        }

        public void Stop()
        {
            ShouldRun = false;

            //Wake up whoever is waiting for dequeue
            lock (_listMutex)
            {
                Monitor.PulseAll(_listMutex);
            }

            foreach (var thread in _threads)
            {
                thread.Join();
            }
            var sum = _statistics.Sum(pair => pair.Value) * 1.0;
            foreach (var stat in _statistics)
            {
                Console.WriteLine($"{stat.Key} ran {stat.Value} functions, {stat.Value/sum * 100} percent of the total.");
            }
        }

        public void Start()
        {
            ShouldRun = true;
            foreach (var thread in _threads) thread.Start();
        }
    }
}

With a test run

public static async Task Main(string[] args)
    {
        var hub = new WorkerHub();
        var tasks = Enumerable.Range(0, (int) 100).Select(x => hub.Enqueue(() => Sum(x)))
            .ToArray();
        var sw = new Stopwatch();
        sw.Start();
        hub.Start();
        await Task.WhenAll(tasks);
        hub.Stop();
        sw.Start();
        Console.WriteLine($"Work took: {sw.ElapsedMilliseconds}ms.");
    }

    public static int Sum(int n)
    {
        var sum = 0;
        for (var i = 0; i <= n; ++i) sum += i;
        Console.WriteLine($"Sum of numbers up to {n} is {sum}");
        return sum;
    }

Am I missing something fundamental? Please note this is not production code (phew) but stuff I am just missing around with so you might find more than 1 issue :)

arynaq
  • 6,710
  • 9
  • 44
  • 74
  • You should really be avoiding really low level mechanisms for dealing with threads whenever possible. You should be favoring tasks and higher level locking mechanisms over interlocked, explicit threads, etc. It will help you greatly reduce the complexity of the code, and the opportunities for errors as a result. It doesn't look like you're doing anything that would *require* to use those lower level mechanisms either. – Servy Oct 05 '18 at 13:43
  • You are right I wouldn't dare do this in production. But I am new to the language and want to learn it's constructs. – arynaq Oct 05 '18 at 13:44
  • Then learn to do things properly, using the proper tools for the proper job, rather than trying to use the improper tools for the job, and picking up back habits. – Servy Oct 05 '18 at 13:46
  • 1
    Well there is something to be said about knowing how your tools work. At least to me personally that is important and I have to see nuts and bolts to know how the house is built. Maybe I should stick with cpp. – arynaq Oct 05 '18 at 13:47
  • If you want to know how any given tool was built then look at its source code. It's all public. – Servy Oct 05 '18 at 13:48
  • 2
    Brr, this is a truly awful deadlock to have to debug. Every new C# feature to make threading look simple added 5 new hard-to-debug problems. Making the Main method async along with using Task caused the problem. You can't predict what thread an async method is going to run on. As it so happens, in this program there is a very subtle race. The task continuation in Main, after the await call, can run on one of the "worker hub" threads, right after it completed. Easy to see when you set a breakpoint on hub.Stop() and then look a Debug > Windows > Threads. Hard to fix. – Hans Passant Oct 05 '18 at 16:10

1 Answers1

1

I wasn't able to repro your MCVE at first because I ran it in a non-async Main()...

If you view the 'Threads' debug window at the call to hub.Stop(); you should see that execution has switched to one of your worker threads. This is why one worker thread does not respond.

I think its related to the problem described here.

Switching Enqueue(Action work) to use TaskCreationOptions.RunContinuationsAsynchronously should fix it:

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

[Edit]

Probably a better way to avoid the problem is to swap out the direct thread management to use tasks (this isn't a proper drop-in replacement for your current code, just want to show the idea):

public class TaskWorkerHub
{
    ConcurrentQueue<Action> workQueue = new ConcurrentQueue<Action>();
    int concurrentTasks;
    CancellationTokenSource cancelSource;
    List<Task> workers = new List<Task>();

    private async Task Worker(CancellationToken cancelToken)
    {
        while (workQueue.TryDequeue(out var workTuple))
        {
            await Task.Run(workTuple, cancelToken);
        }
    }

    public TaskWorkerHub(int concurrentTasks = 4)
    {
        this.concurrentTasks = concurrentTasks;
    }

    public void Enqueue(Action work) => workQueue.Enqueue(work);

    public void Start()
    {
        cancelSource  = new CancellationTokenSource();

        for (int i = 0; i < concurrentTasks; i++)
        {        
            workers.Add(Worker(cancelSource.Token));
        }
    }

    public void Stop() => cancelSource.Cancel();

    public Task WaitAsync() => Task.WhenAll(workers);    
}
Peter Wishart
  • 11,600
  • 1
  • 26
  • 45
  • This fixes it, although I do not know why, I will have to read up on this. It seems weird to me that the runtime would hijack one of my threads to run work on. – arynaq Oct 08 '18 at 08:12
  • Also there is the issue of the managed threads (.net core) being run on whatever core the runtime feels like, when I check the perf of the builtin threadpool I see all of my cores spinning up, while mine are only assigned to half of those. Which makes performance take a dive. – arynaq Oct 08 '18 at 08:44
  • Yes, its very non-obvious behavior. Using `RunContinuationsAsynchronously` will schedule more work to the `ThreadPool` resulting in more managed threads being used. – Peter Wishart Oct 08 '18 at 11:09