8

I've a problem of Producer/Consumer. Currently I've a simple Queue surrounded by a lock.

I'm trying to replace it with something more efficient.

My first choice was to use a ConcurrentQueue, but I don't see how to make my consumer wait on the next produced message(without doing Thread.Sleep).

Also, I would like to be able to clear the whole queue if its size reach a specific number.

Can you suggest some existing class or implementation that would match my requirements?

J4N
  • 19,480
  • 39
  • 187
  • 340
  • 2
    Did you try the [`BlockingCollection`](https://msdn.microsoft.com/en-us/library/dd267312%28v=vs.110%29.aspx) class? – Yacoub Massad Dec 18 '15 at 10:50
  • Take a look at [this question](http://stackoverflow.com/questions/34255759/multiple-producers-single-consumer-locking-schema). Although it is about multiple producers, it wouldn't matter if you have a single producer. – Yacoub Massad Dec 18 '15 at 10:51
  • Can you elaborate on _"to clear the whole queue if its size reach ..."_? For upur other needs the BlockingCollection is the ideal choice. – H H Dec 18 '15 at 10:54
  • @YacoubMassad In fact I did see it, but initially, with its name, I tought it was the `Collection` flavor of the `ConcurrentQueue`, which doesn't handle the FIFO behavior. But it doesn't seems to be the case, I will test it now – J4N Dec 18 '15 at 11:26
  • The default collection type for BlockingCollection is ConcurrentQueue (which is what you want), but you can change that if you want. – Yacoub Massad Dec 18 '15 at 11:28
  • @YacoubMassad Okay, what is the equivalent of the `Peak` method? To get the first entered element but not remove it? – J4N Dec 18 '15 at 11:36
  • And how can I `Clear` the whole collection? – J4N Dec 18 '15 at 11:38
  • 1
    Can you give more context about the clearing requirement? – Yacoub Massad Dec 18 '15 at 11:42
  • Well, at some point, if we reach a size, it means that the consumer isn't able to follow due to performance(it has to draw some elements), and in this case, we prefer to trash all the element of the queue and be able to add some more fresh element. Maybe it would be a good idea that you make an answer with that? – J4N Dec 18 '15 at 11:45
  • You don't have to trash the items, [this constructor of BlockingCollection](https://msdn.microsoft.com/en-us/library/dd267301%28v=vs.110%29.aspx) allows you to set the maximum numbers of items in the collection. If the collection reaches such number, the producer would block until the number of items goes below the number. – Yacoub Massad Dec 18 '15 at 11:47
  • Yes, I have to trash items. Instead of blocking our producer(which would result in loosing the more recent data in my case), we prefer to trash the queue and start over with the more recents items. – J4N Dec 18 '15 at 11:54
  • @J4N In that case I would replace the collection with a empty one. You will need to make your loops not use `GetConsumingEnumerable()` and use `Take()` instead. If you update your question to explain why you need to use `Peek` I can write up a complete answer that I think will work for you. – Scott Chamberlain Dec 18 '15 at 16:27

1 Answers1

3

Here is an example on how you can use the BlockingCollection class to do what you want:

BlockingCollection<int> blocking_collection = new BlockingCollection<int>();

//Create producer on a thread-pool thread
Task.Run(() =>
{
    int number = 0;

    while (true)
    {
        blocking_collection.Add(number++);

        Thread.Sleep(100); //simulating that the producer produces ~10 items every second
    }
});

int max_size = 10; //Maximum items to have

int items_to_skip = 0;

//Consumer
foreach (var item in blocking_collection.GetConsumingEnumerable())
{
    if (items_to_skip > 0)
    {
        items_to_skip--; //quickly skip items (to meet the clearing requirement)
        continue;
    }

    //process item
    Console.WriteLine(item);

    Thread.Sleep(200); //simulating that the consumer can only process ~5 items per second

    var collection_size = blocking_collection.Count;

    if (collection_size > max_size) //If we reach maximum size, we flag that we want to skip items
    {
        items_to_skip = collection_size;
    }
}
Yacoub Massad
  • 27,509
  • 2
  • 36
  • 62
  • Nice :) In my initial implementation I did a `while(blocking_collection.TryTake(out obj)){}`(not a big deal if I miss one more element. But I feel your implementation is faster and more precise! For the `Peek` implementation, I use the `FirstOrDefault()`, what do you think about it? I know that it's not blocking on something else, but in my case, it doesn't matters – J4N Dec 18 '15 at 12:12
  • 1
    Can explain why do you want to peek? Maybe there is a better way to do it. – Yacoub Massad Dec 18 '15 at 12:18