3

I've got a ConcurrentStack that I'm dumping items into. What's a good way to process those items one at a time when the stack isn't empty? I'd like to do this in a way that isn't eating up CPU cycles when the stack isn't being processed.

What I've currently got is basically this and it doesn't seem like an ideal solution.

private void AddToStack(MyObj obj)
{
    stack.Push(obj);
    HandleStack();
}

private void HandleStack()
{
    if (handling)
        return;

    Task.Run( () =>
    {
        lock (lockObj)
        {
            handling = true;
            if (stack.Any())
            {
                //handle whatever is on top of the stack
            }
            handling = false;
        }
    }
}

So the bool is there so multiple threads don't get backed up waiting on the lock. But I don't want multiple things handling the stack at once hence the lock. So if two separate threads do end up calling HandleStack simultaneously and get past the bool, the lock is there so both aren't iterating through the stack at once. But once the second gets through the lock the stack'll be empty and doesn't do anything. So this does end up giving me the behavior I want.

So really I'm just writing a pseudo concurrent wrapper around the ConcurrentStack and it seems like there's got to be a different way to achieve this. Thoughts?

claudekennilol
  • 992
  • 2
  • 13
  • 26
  • Do you really need each element to be handled sequentially? If they can be processed concurrently, just delegate to the threadpool which will efficiently handle its work queue. – Ben Manes May 29 '15 at 14:27
  • You don't need the lock. It's a ConcurrentStack, it's made to be modified by multiple threads. If you really want to block while waiting, use BlockingCollection. By default it uses a ConcurrentQueue but you can specify a different concurrent collection like ConcurrentStack – Panagiotis Kanavos May 29 '15 at 14:41
  • @PanagiotisKanavos I know it's made to modified by multiple threads. I want multiple threads to add to it, but only one to take from it. That's why I'm locking around the pop (obfuscated behind "//handle whatever...") and not around the push. – claudekennilol May 29 '15 at 14:53
  • @BenManes yes, they specifically can't be processed concurrently. – claudekennilol May 29 '15 at 14:53
  • @claudekennilol What are you trying to do? If you want only *one* consumer, just don't add more than one - eg use an ActionBlock or a singleton instance of your consumer. Otherwise, you get round-robin processing with all consumers waiting until the stack empties – Panagiotis Kanavos May 29 '15 at 14:56

3 Answers3

3

You could consider using the Microsoft TPL Dataflow to do this kind of thing.

Here's a simple example showing how to create a queue. Try it out and play around with the settings for MaxDegreeOfParallelism and BoundedCapacity to see what happens.

For your example, I think you'll want to set MaxDegreeOfParallelism to 1 if you don't want more than one thread handling a data item simultaneously.

(Note: You need to use .Net 4.5x and install TPL Dataflow for the project using Nuget.)

Also have a read of Stephen Cleary's blog about TPL.

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace SimpleTPL
{
    class MyObj
    {
        public MyObj(string data)
        {
            Data = data;
        }

        public readonly string Data;
    }

    class Program
    {
        static void Main()
        {
            var queue = new ActionBlock<MyObj>(data => process(data), actionBlockOptions());
            var task = queueData(queue);

            Console.WriteLine("Waiting for task to complete.");
            task.Wait();
            Console.WriteLine("Completed.");
        }

        private static void process(MyObj data)
        {
            Console.WriteLine("Processing data " + data.Data);
            Thread.Sleep(200); // Simulate load.
        }

        private static async Task queueData(ActionBlock<MyObj> executor)
        {
            for (int i = 0; i < 20; ++i)
            {
                Console.WriteLine("Queuing data " + i);
                MyObj data = new MyObj(i.ToString());

                await executor.SendAsync(data);
            }

            Console.WriteLine("Indicating that no more data will be queued.");

            executor.Complete(); // Indicate that no more items will be queued.

            Console.WriteLine("Waiting for queue to empty.");

            await executor.Completion; // Wait for executor queue to empty.
        }

        private static ExecutionDataflowBlockOptions actionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                BoundedCapacity        = 8
            };
        }
    }
}
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
3

ConcurrentStack<T> is one of the collections that implements IProducerConsumerCollection<T>, and as such can be wrapped by BlockingCollection<T>. BlockingCollection<T> has several convenience members for common operations like "consume while the stack is not empty". E.g., you could call TryTake in a loop. Or, you could just use GetConsumingEnumerable:

private BlockingCollection<MyObj> stack;
private Task consumer;

Constructor()
{
  stack = new BlockingCollection<MyObj>(new ConcurrentStack<MyObj>());
  consumer = Task.Run(() =>
  {
    foreach (var myObj in stack.GetConsumingEnumerable())
    {
      ...
    }
  });
}

private void AddToStack(MyObj obj)
{
  stack.Add(obj);
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Thanks, this definitely appears to be what I need. I'm confused as to how the consumer task is working. I chose against TryPop in a loop because I didn't want the constant cpu cycles. How does GetConsumingEnumerable differ from that (i.e. what's happening while the stack is empty / what triggers it when something is added)? – claudekennilol May 29 '15 at 15:48
  • 1
    Both `TryPop` and `MoveNext` (when called on a consuming enumerable) will block the calling thread as long as the underlying collection is empty. I don't know the exact implementation, but it is logically a monitor. So the blocked threads are in a waiting state and do not consume CPU. – Stephen Cleary May 29 '15 at 17:38
2

Looks like you want a typical producer consumer.

I would recommend using an autoresetevent

Have your consumer wait when the stack is empty. Call Set when the producer method is called.

Read this thread

Fast and Best Producer/consumer queue technique BlockingCollection vs concurrent Queue

Community
  • 1
  • 1
Dave Lawrence
  • 3,843
  • 2
  • 21
  • 35
  • 1
    Isn't that what BlockingCollection does already? What's the difference of blocking on BlockingCollection vs blocking on an AutoResetEvent? – Panagiotis Kanavos May 29 '15 at 14:43
  • @PanagiotisKanavos I wasn't actually suggesting using them together but I can see where you got that impression. My answer was a bit clunky reading back on it. – Dave Lawrence May 29 '15 at 14:58
  • 1
    You misunderstood. Why use AutoResetEvent with ConcurrentStack when you can just use BlockingCollection? – Panagiotis Kanavos May 29 '15 at 14:59
  • Oh right... Sure. Totally in agreement. But the "why use when you can just use" arguments can get a bit circular though. That's the beauty/nature of development. There is always a better way or a preferred way of doing something but they are not always the same thing when subjectivity is involved. – Dave Lawrence May 29 '15 at 15:14
  • This isn't a subjective matter - BlockingCollection [uses a semaphore itself](http://referencesource.microsoft.com/#system/sys/system/collections/concurrent/BlockingCollection.cs,462c00bfa4630957) to wait for a publisher. Doing the same as a BCL library only with more code isn't optimal. Optimal is eg. using Interlocked.CompareExchange to check and set a flag without blocking – Panagiotis Kanavos May 29 '15 at 15:32