2

What is the best way of getting messages from many threads onto a queue and have a separate thread processing items of this queue one at a time?

I am frequently using this pattern when trying to disconnect activities from many threads.

I am using a BlockingCollection for this as shown in a code extract below:

// start this task in a static constructor
Task.Factory.StartNew(() => ProcessMultiUseQueueEntries(), TaskCreationOptions.LongRunning);


private static BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>> _q = new BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>>();

    /// <summary>
    /// queued - Simple mechanism that will log the fact that this user is sending an xMsg (FROM a user)
    /// </summary>
    public static void LogXMsgFromUser(XClientMsgExt xMsg)
    {
        _q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(xMsg, null, "", BOStatus.Ignore));
    }

    /// <summary>
    /// queued - Simple mechanism that will log the data being executed by this user
    /// </summary>
    public static void LogBOToUser(BOInfo boInfo)
    {
        _q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, boInfo, "", BOStatus.Ignore));
    }

    /// <summary>
    /// queued - Simple mechanism that will log the status of the BO being executed by this user (causes the red square to flash)
    /// </summary>
    public static void LogBOStatus(string UserID, BOStatus status)
    {
        _q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, null, UserID, status));
    }

    /// <summary>
    /// An endless thread that will keep checking the Queue for new entrants.
    /// NOTE - no error handling since this can't fail... :) lol etc 
    /// </summary>
    private static void ProcessMultiUseQueueEntries()
    {
        while (true)        //  eternal loop
        {
            Tuple<XClientMsgExt, BOInfo, string, BOStatus> tuple = _q.Take();

            // Do stuff

        }
    }

This works fine - so I thought - until the Performance Wizard in VS2010 started to highlight the _q.Take() row as the highest contention line in my code!

Note I have also used a standard ConcurrentQueue with a ManualResetEvent combination and each time I insert an item onto the queue I signal the resetevent allowing the worker thread to examine and process the Queue but this also had the same net effect of being highlighted on the .WaitOne() method...

Are there other ways of solving this common pattern of having many threads adding objects into a concurrent queue - and have a single thread ploughing its way through the items one at a time and in its own time...

Thanks!!

Marcel
  • 2,148
  • 6
  • 31
  • 48
  • I agree with the existing answers that you should expect this to block when dequeuing when empty. Or enqueueing when full; if you want to try a different impl though: http://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530228 – Marc Gravell Mar 18 '11 at 18:41

3 Answers3

6

Highest contention line? Yes, because it is a blocking collection! That call will block (e.g., it might be waiting on a WaitHandle) until another element is added to the collection.

Are you sure this is a problem? It sounds like exactly what I'd expect.

In case it isn't clear what I mean, consider this code:

var blocker = new BlockingCollection<int>();
int nextItem = blocker.Take();

How long would you expect that Take call above to run? I'd expect it to wait forever because nothing's being added to blocker. Thus if I profiled the "performance" of the above code, I'd see Take right up at the top of the list of long-running methods. But this would be not be indicative of a problem; again: you want that call to block.


On a completely separate note, might I recommend replacing Tuple<XClientMsgExt, BOInfo, string, BOStatus> with a type whose properties have descriptive names? (This suggestion has nothing to do with your question, of course; it's just a piece of general advice.)

Dan Tao
  • 125,917
  • 54
  • 300
  • 447
  • @Jim: Ha, clearly you left that comment before the update I just made ;) – Dan Tao Mar 18 '11 at 18:31
  • Don't get me wrong - the code is behaving EXACTLY as I planned it. However, I do suffer from this program getting sluggish when I have more users and therefore many more signals coming in. It might be worth to note that the (single) thread that is collecting items of the queue is updating UserInfo items that are WPF bound to the UI. – Marcel Mar 18 '11 at 19:05
  • Sorry hit enter again... And with more users the UI is getting sluggish. I am trying to figure out why this is so out came the Performance Analyser which is hitting on contention in that .Take() line which I struggle to understand. – Marcel Mar 18 '11 at 19:08
  • @Marcel: What I'm saying is that if the collection is empty (or full), the `Take` call **should block**, which is probably what's happening, which is probably why your perf analyzer is reporting contention. Is this a desktop application? And if so, are you calling `Take` from the UI thread? You definitely shouldn't do that; I can tell you that much. Otherwise, you'd have to include some more details of your application in order for people to be able to help you. – Dan Tao Mar 18 '11 at 19:20
  • I am running into a similar problem. "blocking" is not the issue here since I have many producers downloading items via HTTPClient and enqueuing them in the blocking collection keeping it more of less full, it's the throughput on the consumer end to "dequeue" items that appears to be a potentia bottleneck. I have run a simple test of enqueuing 5M elements on multiple producers and have one consumer(one that has thread affinity) to deque items and the perf is not ideal. I have timed just the download of the items (producer side) and that runs blazing fast. Are there any other high perf options? – Abhijeet Patel Sep 04 '16 at 19:17
2

That _q.Take() is the highest contention line is meaningless in itself. There's going to be contention if many threads are waiting for items. The big question is whether that contention is costing you in terms of performance. A few questions you need to find the answers to:

  1. Are you able to process items fast enough to prevent the queue from growing without bound?
  2. Is the contention costing you in terms of CPU usage?
  3. If you stop adding things to the collection, does the Performance Wizard still report contention?
  4. If you stop adding things to the collection, is there high CPU usage?

If you're able to keep the queue from growing and CPU resources aren't being spent on taking items, then there's no problem. Except perhaps you have more threads than necessary reading from the collection.

Jim Mischel
  • 131,090
  • 20
  • 188
  • 351
  • With respect to your third list item, I would expect the reported contention to be at a *maximum* when items are not being added to the collection. It's going to be waiting on some form of mutex indefinitely; I'm not sure a profiler could tell the difference between this and "bad" contention. – Dan Tao Mar 18 '11 at 18:38
  • @Dan: That was the point. If it's reporting contention when there are no items, then the contention probably isn't a problem. – Jim Mischel Mar 18 '11 at 18:58
  • I only have a single thread that processes the items OFF the queue but many threads depositing items onto it. Is there any standard approach for this coding pattern? – Marcel Mar 18 '11 at 19:09
  • 1
    OK, I get what you meant now. I interpreted the meaning of your question backwards. @Marcel: The approach you're using is exactly the right one. I can almost guarantee that `Take` is not your culprit. Depending on how you *use* it, there might be an issue--for example if you're calling it *from* the same thread that's responsible for updating the UI. – Dan Tao Mar 18 '11 at 19:23
0

Have you considered using an actual MSMQ? May seem like overkill, but it does provide what you need. No saying your application can't be both writer and have a dedicated thread to read and process the messages.

TheZenker
  • 1,710
  • 1
  • 16
  • 29
  • You know why it seems like overkill? Because it is: Handling (for all we know) in-process synchronization issues by calling out of the process. That fact in itself, let alone making use of MSMQ, adds many error sources, not to mention latency. – Evgeniy Berezovsky Jun 09 '15 at 02:32