0

I have a producer that gets users from a resource and places them into a ConcurrentQueue, then What I want to do is using multiple consumers and process all users and get their information from another resource.

  public void Populate(IEnumerable<Users> users){
     _queue.Enqueue(users);
     // here single threaded
  }

  public void Process(){
     // here i want this to be processed by multiple consumers
     // say multiple threads so that I can finish processing them.
  }

My question is, should i use thread? task? ThreadPool?

I have seen this question: C# equivalent for Java ExecutorService.newSingleThreadExecutor(), or: how to serialize mulithreaded access to a resource

Community
  • 1
  • 1
DarthVader
  • 52,984
  • 76
  • 209
  • 300
  • Use whichever you want; they can all do the job. Note that they are built on top of each other. A Task uses the thread pool (generally) and a thread pool is implemented using threads. – Servy Jun 20 '13 at 18:49
  • @Servy: just to add that while they can all do the job, in the general case if there is some relations between tasks like continuation or branching it's better to use a higher level API like the TPL which moreover seamlessly integrates with the new **await/async** feature of C# 4.5. – Pragmateek Jun 20 '13 at 18:58
  • @Pragmateek At the end of the day it's entirely personal preference. If someone finds it easier to use `Thread` for everything rather than task, they can do so. Most people seem to find tasks easier to work with, but that's just an opinion. It also depends on the problems; some problems are better suited for different abstractions. Also note that there are a lot of parallization frameworks/abstractions besides just those three, some in the BCL and many 3rd party extensions/replacements. – Servy Jun 20 '13 at 19:00
  • There is also parallel foreach, my question is what s the best practice here? – DarthVader Jun 20 '13 at 19:01
  • @DarthVader: like *Servy* said this is above all a matter of personal preferences. From an academic point of view I think it's best to first master the low-level stuff like threads, then move to higher level abstractions like the TPL because they can allow you to write more readable code and in a more productive way. – Pragmateek Jun 20 '13 at 19:16
  • @DarthVader And my response was that you can use whatever you want. It's a subjective question with no objective answer. Such questions are not appropriate on Stack Overflow, which is why this question should be closed. It's "Not Constructive". – Servy Jun 20 '13 at 20:00

1 Answers1

6

Since you are using a queuing mechanism already, I suggest you use a BlockingCollection instead of ConcurrentQueue, along with Parallel.Invoke().

There are some important things about BlockingCollection that make it nice to use.

  1. BlockingCollection lets the consuming threads take items from the collection in a threadsafe and natural manner using foreach.
  2. The consuming foreach loop blocks automatically when the queue is empty, and continues when items become available.
  3. BlockingCollection provides an easy-to-use mechanism to signal the end of data. The queue owner simply calls queue.CompleteAdding() and any foreach loops taking items from the queue will exit automatically when the queue becomes completely empty.

You can use Parallel.Invoke() to start a number of threads, each of which uses foreach to iterate over the queue. (Parallel.Invoke() lets you give it an array of tasks to run in parallel, which makes it quite simple to use.)

This is best illustrated with a sample program:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class User
    {
        public string Name;
    }

    class Program
    {
        readonly BlockingCollection<User> _queue = new BlockingCollection<User>();

        void run()
        {
            var background = Task.Factory.StartNew(process); // Start the processing threads.

            // Make up 50 sample users.
            var users = Enumerable.Range(0, 50).Select(n => new User{Name = n.ToString()});

            foreach (var user in users) // Add some sample data.
                _queue.Add(user);

            Console.WriteLine("Press <RETURN> to exit.");
            Console.ReadLine();
            _queue.CompleteAdding(); // Makes all the consuming foreach loops exit.
            background.Wait();
            Console.WriteLine("Exited.");
        }

        void process() // Process the input queue,
        {
            int taskCount = 4;  // Let's use 4 threads.
            var actions = Enumerable.Repeat<Action>(processQueue, taskCount);
            Parallel.Invoke(actions.ToArray());
        }

        void processQueue()
        {
            foreach (User user in _queue.GetConsumingEnumerable())
                processUser(user);
        }

        void processUser(User user)
        {
            Console.WriteLine("Processing user " + user.Name);
            Thread.Sleep(200); // Simulate work.
        }

        static void Main()
        {
            new Program().run();
        }
    }
}

If you don't need to limit the number of concurrent threads and are happy to let .Net decide for you (not a bad idea), then you can simplify the code quite a bit by removing processQueue() altogether and changing process() to:

void process() // Process the input queue,
{
    Parallel.ForEach(_queue.GetConsumingEnumerable(), processUser);
}

However, that does more locking than it needs to, so you're probably best off just using the original method (which doesn't suffer from that problem), or using the solution described here: http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
  • BTW, `Parallel.ForEach(_queue.GetConsumingEnumerable()` uses locks more than it has to. And the chunking `Parallel.ForEach` does doesn't work well with `GetConsumingEnumerable()`either. If either of those is a problem for you, you should use [`GetConsumingPartitioner()` from ParallelExtensionsExtras](http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx) instead. – svick Jun 21 '13 at 20:45