1

Before I spend too long reinventing the wheel, I wanted to check in case there is already a class in .Net that does what I want.

What I want is something a bit like a Semaphore (or perhaps even like a CountdownEvent), but slightly different.

I have a requirement where I have a varying number of "resources" available, and I want a thread to wait efficiently when there are zero resources available. In the meantime, another thread can free up a resource, which should immediately release the other waiting thread.

This sounds a lot like a Semaphore, but its not because a Semaphore (as far as I can see) treats each thread as a "resource" in terms of counting them.

Anyway, here's my first simple implementation of what I want. It has no disposal, code contracts, error handling, timeout support or cancellation support yet, but it should demonstrate what I want:

public sealed class ResourceCounter
{
    /// <summary>Create with the specified number of resources initially available.</summary>

    public ResourceCounter(int resourceCount)
    {
        _resourceCount = resourceCount;

        if (_resourceCount > 0)
        {
            _resourceAvailable.Set();
        }
    }

    /// <summary>Acquires a resource. Waits forever if necessary.</summary>

    public void Acquire()
    {
        while (true)
        {
            _resourceAvailable.Wait();

            lock (_lock)
            {
                if (_resourceCount > 0)
                {
                    if (--_resourceCount == 0)
                    {
                        _resourceAvailable.Reset();
                    }

                    return;
                }
            }
        }
    }

    /// <summary>Releases a resource.</summary>

    public void Release()
    {
        lock (_lock)
        {
            ++_resourceCount;
            _resourceAvailable.Set();
        }
    }

    private int _resourceCount;
    private readonly object _lock = new object(); 
    private readonly ManualResetEventSlim _resourceAvailable = new ManualResetEventSlim();
}

The usage pattern is very simple:

  1. Construct a ResourceCounter with the required initial resource count (which can be zero or more).

  2. A thread which wants to acquire a resource calls ResourceCounter.Acquire(), which will not return until a resource is available and has been acquired.

  3. A thread which wants to release a resource calls ResourceCounter.Release(), which will release a resource and return immediately.

Note that any thread can release a resource; it doesn't have to be the one that acquired the resource.

I'm using this as part of some multithreaded pipeline code where one thread is responsible for enqueuing work items, several threads are processing the work items, and another thread is outputting the processed work items. The thread which outputs the processed work items has to multiplex them (since the processing threads may output completed items in any order), and I needed a mechanism to stop work items from being queued endlessly while the multiplexer waits for a tardy item.

(See Pipelines, multiplexing, and unbounded buffering for some background on this.)

Anyway, is there anything already available to do this, or should I continue to develop my own class for it?


[EDIT]

As noted below, a SemaphoreSlim does exactly the right thing. I'd rejected it because I thought that the thread that called Wait() had to be the one that called Release(), but that wasn't the case. This is what I get for coding on a Sunday... ;)

Community
  • 1
  • 1
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
  • 1
    Stuff to solve this sort of problem is usually built out of `AutoResetEvent`s to signal threads that something interesting just happened. Also you might want to look at this question on the standard way to make a producer-consumer pattern in C#: http://stackoverflow.com/questions/1656404/c-sharp-producer-consumer – Eric Lippert Feb 24 '13 at 15:01
  • Why don't you use the “pool queue” (using `BlockingQueue`) as mentioned in [the accepted answer of your previous question](http://stackoverflow.com/a/15024780/41071)? – svick Feb 24 '13 at 15:40
  • Also, I don't understand why you can't use `Semaphore(Slim)`. It seems to me it does exactly what you want (you can acquire semaphore on one thread and release it on another), – svick Feb 24 '13 at 15:46
  • Can you provide a sample implementation using a SemaphoreSlim? I couldn't work out how to achieve the equivalent of my simple little class. (Incidentally, I am already using the pool queue, which is working fine. I just wanted to come up with a more lightweight solution. It's kind of an academic excercise really - there is no real optimimisation required for this particular code) – Matthew Watson Feb 24 '13 at 16:47
  • @MatthewWatson `SemaphoreSlim` is the same as your `ResourceCounter`. Just use `Wait()` instead of `Acquire()`. – svick Feb 24 '13 at 16:50
  • Thanks, I see that now. I don't know why, but I had gained the impression that the same thread that acquired the resource had to be the one that released it. As you point out, that's not the case. :) – Matthew Watson Feb 24 '13 at 16:57

2 Answers2

3

A multi-stage pipeline architecture is more easily constructed using queues to communicate. A producer thread places items into a work queue, one or more worker threads dequeue and process items, and add them to an output queue. A final thread reads the output queue and outputs the data.

In .NET, this is easily accomplished with BlockingCollection.

See https://stackoverflow.com/a/5108487/56778 for an example of a two-stage pipeline. Adding another stage is straightforward.

To handle the problem of the output thread getting things out of order, I made the output queue a priority queue, using a min heap. My items were identified by a sequential record number, so the output thread knew which record number was to be output next. It would wait on an AutoResetEvent for an item to be placed on the queue (the worker process would set the event when an item was enqueued). The output thread would then peek at the top item to see if it matched the expected item. If not, it would wait on the event again.

That worked very well, because it eliminated the second queue. The block was in the output queue, where it belonged. Performance was very good for my purposes. Enqueuing an item is an O(log n) operation, but in practice n is very small. Even when the queue had 100,000 items in it, the amount of time required to enqueue an item was insignificant compared to the amount of time it took to process a record.

You can still use BlockingCollection for this. You just have to make a binary heap class implement the IProducerConsumerCollection interface. I did that by adding locks to the simple binary heap class I published in A Generic BinaryHeap class. You can then provide one of those to the BlockingCollection constructor, like this:

BlockingCollection<MyRecord> = 
    new BlockingCollection<ConcurrentBinaryHeap<MyRecord>>(
    new ConcurrentBinaryHeap<MyRecord>, MaxQueueSize);

There is a potential deadlock here, though. If the queue fills up (i.e. exceeds the maximum you set when you initialized the BlockingCollection), then the tardy thread can't enqueue the item and all work comes to a complete halt. This never happened to me in practice, because although my per-record processing times varied, they didn't vary that much.

If it's a concern, you can either increase the queue size (only works if you can say with certainty that you won't ever fill the queue), or provide an alternate channel for the next expected item to be posted if the queue is full. I made that work, but for my purposes it was easier just to increase the queue size.

If you're interested, I can dig through my archives to find that ConcurrentBinaryHeap class.

Community
  • 1
  • 1
Jim Mischel
  • 131,090
  • 20
  • 188
  • 351
  • Have a look at the thread I linked; you'll see that I'm already doing that. I'm currently using an extra blocking collection to prevent items from being endlessly dequeued by the multiplexor in the event that a processor thread is taking a very long time to complete it's work unit. I want to replace the extra queue with something more lightweight. – Matthew Watson Feb 24 '13 at 16:44
  • Thanks for the information. In fact, that's pretty much exactly what I'm doing! I ended up modifying Microsoft's ConcurrentPriorityQueue sample by adding an AutoResetEvent to signal when a new item was placed in the queue for exactly the purpose you described. I think the fact that we both have the same solution indicates that it's a good one (I hope ;). I'm going to post my code to http://stackoverflow.com/questions/15021469/pipelines-multiplexing-and-unbounded-buffering for completeness at some point. – Matthew Watson Feb 24 '13 at 20:57
1

The way threads communicate with each other is unrelated to the resources or their locking mechanism. Nothing prevents you to pass semaphores and resources around within the same process, using an ad-hoc messaging system (message queues, events, or whatever fits your need).

didierc
  • 14,572
  • 3
  • 32
  • 52
  • What I perhaps didn't make clear is that the thread that acquires the resource is NOT the same as the one that releases it. The consumed resource is passed through a pipeline to another multiplexor thread which is the one that releases the resource, once it has processed it. I'm not sure how to set up a Semaphore to do that. – Matthew Watson Feb 24 '13 at 16:45
  • @MatthewWatson You don't need to set anything up, `Semaphore` should work out of the box for that. – svick Feb 24 '13 at 16:51
  • Right you are. For some reason, I was thinking that the same thread that acquired the resource should also release it. But SemaphoreSlim does just what I want. – Matthew Watson Feb 24 '13 at 16:56
  • actually you did make it clear, but I was not sure it was the stopgap. Updated my answer to only focus on that issue. Thanks! – didierc Feb 24 '13 at 17:02