3

I'm creating an indexer which is Enqueueing items which needs to be processed. The indexer will add items to its processor. It will for example add 100 items, then doesn't add items for 3 minutes and add another 50 items.

public class Processer
{
    private ConcurrentQueue<Item> items;

    public void AddItem(Item item)
    {
        this.items.Enqueue(item);
    }
}

The items will come in at random intervals, so I will create a separate thread to dequeue and process these items.

What would be the best option to use?

  1. Don't use a Collection, but use the ThreadPool:

    public void AddItem(Item item)
    {
        ThreadPool.QueueUserWorkItem(function, item);
    }
    

    This will automatically create a Queue, and process the items, but I have less control, when 20 items are found, they will almost stop my indexer to run and first finish this thread pool

  2. Use a long running Task:

    public Processer()
    {
        this.task = Task.Factory.StartNew(() => DequeueItems(),
            CancellationToken.None,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
    }
    
    public DequeueItems()
    {
        while(true)
        {
            Item item = null;
            while(this.items.TryDequeue(out item)
            {
                this.store.ExecuteIndex((AbstractIndexCreationTask)item);
            }
    
            Thread.Sleep(100); 
        }
    }
    

    But I hate the while() and thread.sleep I've got to use, since the enumerable will dry up after some time, and it will need recheck if there are new items.

  3. Use a short running task:

    public Processer()
    {
    
    }
    private void Run()
    {
        this.task = Task.Factory.StartNew(() => DequeueItems(),
            CancellationToken.None,
            TaskCreationOptions.PreferFairness,
            TaskScheduler.Default);
    }
    public void AddItem(Item item)
    {
        this.items.Add(item);
        if(this.task == null || this.task.isCompleted)
            this.Run();
    }
    public DequeueItems()
    {
        Item item = null;
        while(this.items.TryDequeue(out item)
        {
            this.store.ExecuteIndex((AbstractIndexCreationTask)item);
        }
    }
    

    This might be nicer? But starting a thread is a "expensive" operation, and I don't know if I can miss items since I check IsCompleted, which can be in the process of ending the while loop and this way missing 1 item. But it doesn't sleep, and use a dirty while loop.

  4. Your option, since MSDN recommends to use the TPL, I thought not to use Threads, but maybe there are better ways dealing with this problem

Changelog

  1. Changed to BlockingCollection
  2. Changed back to ConcurrentQueue

Some things I've checked:

user2331234
  • 149
  • 1
  • 1
  • 9
  • 2
    4. Using either [`BlockingCollection`](http://msdn.microsoft.com/en-US/library/dd267312.aspx) for [producer-consumer scenarios](http://blogs.msdn.com/b/csharpfaq/archive/2010/08/12/blocking-collection-and-the-producer-consumer-problem.aspx) or [`ConcurrentQueue`](http://msdn.microsoft.com/en-US/library/dd267265.aspx)? – Patryk Ćwiek Jun 21 '13 at 07:11
  • Yes I know I can use other thread safe queue's, but the Queue isn't the problem, I was hoping to get some feedback over what is the best practice to deal with this problem. – user2331234 Jun 21 '13 at 07:13
  • 1
    @user2331234: No, the queue very much *is* the problem. It's not thread-safe, whereas `BlockingCollection` is precisely designed for producer/consumer situations like yours. – Jon Skeet Jun 21 '13 at 07:15
  • The problem for me is how to add a separate thread to process the items. I will adjust my example above with the BlockingCollection. But still I hope to get some feedback which of the options to use to dequeue the queue. – user2331234 Jun 21 '13 at 07:20
  • The GetConsumingEnumerable will eventually end the loop isn't it? So when my indexer will add 100 items, the processor is eventually done indexing. When it's done the thread is killed, and new items won't be indexed. My indexer can find 100 items, then wait 3 minutes (searching new items) and then add another 100 items. Therefore i used the while sleep. I've eddited it to use a blocking collection while you posted your reaction by the way ;) – user2331234 Jun 21 '13 at 07:35
  • 1
    I only saw your edit after I posted my comment. My understanding is that `GetConsumingEnumerable` will wait indefinitely until `CompleteAdding` is called, so I still don't understand the `Thread.Sleep` and `while` loop. – Daniel Kelley Jun 21 '13 at 09:00
  • Daniel you are right, I changed it in the example above. This was the only way to stop comments about my Queue construction. BlockingCollection will be helpful when you add items and then loop through them (once). In my case items will be added through the lifetime of the application, and items must be processed when possible. – user2331234 Jun 21 '13 at 10:09

2 Answers2

3

I think the simplest solution here is to use BlockingCollection (probably using its GetConsumingEnumerable()) along with a long-running Task. When there's nothing to do, this will waste a Thread, but a single wasted Thread is not that bad.

If you can't afford to waste that Thread, then you can go with something like your #3. But you have to be very careful about making it thread-safe. For example, in your code, if the Task isn't running and AddItem() is called from two threads at the same time, you end up creating two Tasks, which is almost certainly wrong.

Another option, if you're on .Net 4.5 is to use ActionBlock from TPL Dataflow. With it, you're not wasting any threads and you don't have to write the difficult thread-safe code yourself.

svick
  • 236,525
  • 50
  • 385
  • 514
  • Thanks, I will add a lock to make sure it's only running once. It's a shame I cannot use .NET 4.5 since I'm creating software for windows server 2003+... – user2331234 Jun 24 '13 at 06:54
2

I think a Semaphore might be the right thing for you. You'll find a pretty good explanation of it here

In Addition I would suggest to use a ConcurrentQueue

openshac
  • 4,966
  • 5
  • 46
  • 77
Romano Zumbé
  • 7,893
  • 4
  • 33
  • 55
  • http://stackoverflow.com/questions/15456499/de-queue-items-with-worker-threads Just like this one, (in my "already checked list"), I'm wondering if it isn't better to use the TPL, but thanks for the example. It might be a nice way indeed to limit the max number of threads, but it will open a lot of pending threads in my case, I don't know if this will lead to exceptions. It's possible that there are 1000 items in the queue (or more), which will directly get an own thread, which is waiting for a sem to get released. – user2331234 Jun 21 '13 at 07:50
  • Could you explain how exactly would you use `Semaphore` here? – svick Jun 21 '13 at 11:35