-2

I have got an issue with race conditions. They are outlined in the code example where I write comments // POSSIBLE RACE. This design is something that I came up with myself, but it's got race issues and I am not sure how to overcome them. Perhaps using semaphores is the wrong choice.

Scenario: A producer should produce jobs while there are jobs in DB queue AND consumers are still processing jobs. If consumers have finished processing jobs, producer should release all consumers and the producer and consumers should exit.

How do I solve the issue below such that I can have a pool of consumers and one producer, where producer signals to consumers when to check queue for more items if they have run out?

Should I be using a different pattern? Should I be using Semaphore, Mutex, or some other kind of locking mechanism?

Thank you for your help! I have been trying to solve this issue for quite some time.

Fiddle: https://dotnetfiddle.net/Widget/SeNqQx

public class Producer
{
    readonly int processorCount = Environment.ProcessorCount;
    readonly List<Consumer> consumers = new List<Consumer>();
    ConcurrentQueue<Job> jobs;
    readonly object queueLock = new object();
    readonly Semaphore producerSemaphore;
    readonly Semaphore consumerSemaphore;

    public Producer()
    {
        producerSemaphore = new Semaphore(1, 1);
        consumerSemaphore = new Semaphore(processorCount, processorCount);
    }

    public void StartTask()
    {
        jobs = GetJobs();
        using (var resetEvent = new ManualResetEvent(false))
        {
            for (var i = 0; i < processorCount; i++)
            {
                var consumer = new Consumer(jobs, queueLock, producerSemaphore, consumerSemaphore);
                consumers.Add(consumer);
                QueueConsumer(consumer, processorCount, resetEvent);
            }

            AddJobsToQueueWhenAvailable(resetEvent);
            resetEvent.WaitOne(); // waits for QueueConsumer(..) to finish
        }
    }

    private ConcurrentQueue<Job> GetJobs(){
        var q = new ConcurrentQueue<Job>();
        for (var i = 0; i < 5; i++) q.Enqueue(new Job()); // this usually comes from DB queue
        return q;
    }

    private void QueueConsumer(Consumer consumer, int numberOfThreadsRunning, ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            try
            {
                consumer.StartJob();
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception occurred " + ex);
            }
            finally
            {

                // Safely decrement the counter
                if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
                {
                        resetEvent.Set();
                }
            }
        });
    }
    private void AddJobsToQueueWhenAvailable(ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            while (true) // TODO - replace with cancellation token
            {
                // lock queue - so that no workers will steal another workers item
                lock (queueLock)
                {
                    // check that at least 1 worker is still active
                    if (consumers.TrueForAll(w => !w.IsRunning))
                    {
                        // all jobs complete - release all locks if 0 workers active
                        consumerSemaphore.Release(processorCount);
                        return;
                    }

                    // poll for new items that have been added to the queue
                    var newJobs = GetJobs();

                    // for each item:
                    foreach (var job in newJobs)
                    {
                        // add item to queue
                        jobs.Enqueue(job);

                        // If we have any workers halted, let them know there are new items!
                        if (consumers.Any(w => !w.IsRunning))
                        {
                            // POSSIBLE RACE - Consumer may set IsRunning=false, but haven't called wait yet!
                            // signal worker to continue via semaphore
                            consumerSemaphore.Release(1);
                            // wait until worker thread wakes up and takes item before unlocking queue
                            producerSemaphore.WaitOne();
                        }
                    }
                } // unlock queue

                // sleep for a bit
                Thread.Sleep(500); // TODO - replace with cancellation token
            }
        });
    }
}

public class Consumer
{
    public bool IsRunning;
    ConcurrentQueue<Job> jobs;
    private object queueLock;
    private Semaphore producerSemaphore;
    private Semaphore consumerSemaphore;

    public Consumer(ConcurrentQueue<Job> jobs, object queueLock, Semaphore producerSemaphore, Semaphore consumerSemaphore)
    {
        this.jobs = jobs;
        this.queueLock = queueLock;
        this.producerSemaphore = producerSemaphore;
        this.consumerSemaphore = consumerSemaphore;
    }

    public void StartJob() {
        while(TryGetNextJob(out var job)) {
            // do stuff with job
        }
    }

    private bool TryGetNextJob(out Job nextJob)
    {
        // lock to prevent producer from producing items before we've had a chance to wait
        lock (queueLock)
        {
            if (jobs.TryDequeue(out nextJob))
                return true; // we have an item - let's process it

            // worker halted
            IsRunning = false;
        }

        // wait for signal from producer
        consumerSemaphore.WaitOne();

        // once received signal, there should be a new item in the queue - if there is not item, it means all children are finished
        var itemDequeued = jobs.TryDequeue(out nextJob);
        if (!itemDequeued)
        {
            return false; // looks like it's time to exit
        }

        // another item for us to process 
        IsRunning = true;
        // let producer know it's safe to release queueLock        
        producerSemaphore.Release(); // POSSIBLE RACE - producer may not have locked yet! (WaitOne)

        return true;
    }

}

public class Job { }
ChickenFeet
  • 2,653
  • 22
  • 26
  • Multithreading is hard, to learn it you have to rather find a tutorial or something better (a book). Ideally you want to understand the [difference](https://stackoverflow.com/q/2332765/1997232) and know [basics](http://www.albahari.com/threading/) before attempting. Not attempting and then asking. See also [mcve]. – Sinatr Apr 14 '20 at 09:56
  • 2
    This question strikes me as a bit of a fishing expedition... However, as a toolkit for producer/consumer scenarios, [TPL Dataflow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) is very handy, and will allow you to rise above low-level synchronization primitives and focus more on the workflow itself. You shouldn't need to think about locking, mutexes & semaphores to get this done. – spender Apr 14 '20 at 10:05
  • I know the difference between Mutex and Semaphore and I've done a fair bit of multithreading. But I haven't had to coordinate two threads with reliance on one another before. I am starting to think this issue doesn't have a solution and perhaps I should try a completely different approach. The `Release()` and `WaitOne()` combined need to be an atomic operation for this to work. – ChickenFeet Apr 14 '20 at 10:06
  • I think maybe I should change the code so that the consumer doesn't need to know about the producer and the consumer will exit if it finishes work. Producer can then just spin up new threads for any new jobs. This will remove the synchronisation aspect. – ChickenFeet Apr 14 '20 at 10:09
  • Your design has additional issues. What happens when the producer tells a busy consumer to shut down? You only want to shut down idle consumers. How does the producer know the consumers are finished processing? – Jim Rogers Apr 14 '20 at 16:17
  • Your design mixes the concepts of a producer and a task master. The Producer-Consumer model does not establish the producer as a task master. – Jim Rogers Apr 14 '20 at 16:20
  • I have solved the issue. I'll post up a solution when I can. @JimRogers the producer first checks to make sure all consumers are not running (by checking `IsRunning`) before shutting them down. This flag is set in a lock so there shouldn't be any race conditions there. – ChickenFeet Apr 15 '20 at 04:05
  • @JimRogers you're right about the design pattern. This isn't really a typical producer/consumer pattern. I think that's why there is some confusion about this question. This is a design I've made up, I am not sure if anything else exists like it. I'll call it the Chicken Feet Pattern :) – ChickenFeet Apr 15 '20 at 04:38
  • @ChickenFeet You might be interested in some concurrent design patterns described in https://sworthodoxy.blogspot.com/2015/05/shared-resource-design-patterns.html – Jim Rogers Apr 15 '20 at 16:57
  • @JimRogers, thanks! I'll check it out once I find the time. – ChickenFeet Apr 16 '20 at 07:18

2 Answers2

3

I would recommend taking a look at BlockingCollection. However many consumer threads may call Take, if there is a item it will be returned, if not, the thread will block. It also support setting a bound on the capacity to make the adding thread block if the capacity is exceeded.

This should remove the need for semaphores and reset events and make the code much simpler overall. See Blocking Collection and the Producer-Consumer Problem for a more complete description.

JonasH
  • 28,608
  • 2
  • 10
  • 23
  • 2
    Came here to say this. The BlockingCollection already solves your problems, plus other problems you haven't thought of yet. Even if you don't want to use that class for some reason, it's worth studying how it works and why it was designed the way it was. – John Wu Apr 14 '20 at 22:50
0

Thanks for the help. I will certainly look into BlockingCollection.

So I actually wasn't far off what I wanted. I just needed to read a bit more on Semaphores (initialise with correct initial count) for the code to work correctly, as well as a few other bits and pieces. Search for EDIT to see what has changed. Working solution:

public class Producer
{
    readonly int processorCount = Environment.ProcessorCount;
    readonly List<Consumer> consumers = new List<Consumer>();
    ConcurrentQueue<Job> jobs;
    readonly object queueLock = new object();
    readonly Semaphore producerSemaphore;
    readonly Semaphore consumerSemaphore;
    int numberOfThreadsRunning;

    public Producer()
    {
        producerSemaphore = new Semaphore(0, 1); // EDIT - MUST START WITH 0 INITIALLY
        consumerSemaphore = new Semaphore(0, processorCount); // EDIT - MUST START WITH 0 INITIALLY
        numberOfThreadsRunning = processorCount; // EDIT - take copy so that Interlocked.Decrement references the same int variable in memory
    }

    public void StartTask()
    {
        jobs = GetJobs();
        using (var resetEvent = new ManualResetEvent(false))
        {
            for (var i = 0; i < processorCount; i++)
            {
                var consumer = new Consumer(jobs, queueLock, producerSemaphore, consumerSemaphore);
                consumers.Add(consumer);
                QueueConsumer(consumer, resetEvent);
            }

            AddJobsToQueueWhenAvailable(resetEvent);
            resetEvent.WaitOne(); // waits for QueueConsumer(..) to finish
        }
    }

    private ConcurrentQueue<Job> GetJobs(){
        var q = new ConcurrentQueue<Job>();
        for (var i = 0; i < 5; i++) q.Enqueue(new Job()); // this usually comes from DB queue
        return q;
    }

    private void QueueConsumer(Consumer consumer, ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            try
            {
                consumer.StartJob();
            }
            catch (Exception ex)
            {
                lock (queueLock)
                {
                    consumers.Remove(worker);
                }
                Console.WriteLine("Exception occurred " + ex);
            }
            finally
            {

                // Safely decrement the counter
                if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
                {
                        resetEvent.Set();
                }
            }
        });
    }
    private void AddJobsToQueueWhenAvailable(ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            while (true) // TODO - replace with cancellation token
            {
                // lock queue - so that no workers will steal another workers item
                lock (queueLock)
                {
                    // check that at least 1 worker is still active
                    if (consumers.TrueForAll(w => !w.IsRunning))
                    {
                        // all jobs complete - release all locks if 0 workers active
                        consumerSemaphore.Release(processorCount);
                        return;
                    }

                    // poll for new items that have been added to the queue
                    var newJobs = GetJobs();

                    // for each item:
                    foreach (var job in newJobs)
                    {
                        // add item to queue
                        jobs.Enqueue(job);

                        // If we have any workers halted, let them know there are new items!
                        if (consumers.Any(w => !w.IsRunning))
                        {
                            // POSSIBLE RACE - Consumer may set IsRunning=false, but haven't called wait yet!
                            // EDIT - Ordering does not matter. If semaphore is Released() before WaitOne() is 
                            //        called, then consumer will just continue as soon as it calls WaitOne()
                            // signal worker to continue via semaphore
                            consumerSemaphore.Release();
                            // wait until worker thread wakes up and takes item before unlocking queue
                            producerSemaphore.WaitOne();
                        }
                    }
                } // unlock queue

                // sleep for a bit
                Thread.Sleep(500); // TODO - replace with cancellation token
            }
        });
    }
}

public class Consumer
{
    public bool IsRunning;
    ConcurrentQueue<Job> jobs;
    private object queueLock;
    private Semaphore producerSemaphore;
    private Semaphore consumerSemaphore;

    public Consumer(ConcurrentQueue<Job> jobs, object queueLock, Semaphore producerSemaphore, Semaphore consumerSemaphore)
    {
        this.jobs = jobs;
        this.queueLock = queueLock;
        this.producerSemaphore = producerSemaphore;
        this.consumerSemaphore = consumerSemaphore;
        CurrentlyProcessing = true; // EDIT - must default to true so producer doesn't exit prematurely

    }

    public void StartJob() {
        while(TryGetNextJob(out var job)) {
            // do stuff with job
        }
    }

    private bool TryGetNextJob(out Job nextJob)
    {
        // lock to prevent producer from producing items before we've had a chance to wait
        lock (queueLock)
        {
            if (jobs.TryDequeue(out nextJob))
                return true; // we have an item - let's process it

            // worker halted
            IsRunning = false;
        }

        // wait for signal from producer
        consumerSemaphore.WaitOne();

        // once received signal, there should be a new item in the queue - if there is not item, it means all children are finished
        var itemDequeued = jobs.TryDequeue(out nextJob);
        if (!itemDequeued)
        {
            return false; // looks like it's time to exit
        }

        // another item for us to process 
        IsRunning = true;
        // let producer know it's safe to release queueLock        
        producerSemaphore.Release(); // POSSIBLE RACE - producer may not have locked yet! (WaitOne)
        // EDIT - Order does not matter. If we call Release() before producer calls WaitOne(), then
        //        Producer will just continue as soon as it calls WaitOne().

        return true;
    }

}

public class Job { }
ChickenFeet
  • 2,653
  • 22
  • 26