4

I currently have an app that is receiving packets from a socket, processing them and adding them to a ConcurrentQueue. I then have a separate thread that is processing these items.

The problem I am having is the Producer/Consumer issue where the consumer is trying to take items even when there are not any items, resulting in significant cpu usage.

ProcessPackets is run on its own thread:

    private ConcurrentQueue<PrimaryPacket> Waiting = new ConcurrentQueue<PrimaryPacket>();

    private void ProcessPackets()
    {
        PrimaryPacket e;

        while (true)
        {
            if (Waiting.TryDequeue(out e))
            {
                Packets.TryAdd(((ulong)e.IPAddress << 32 | e.RequestID), e);
            }
        }
    }

    public void AddPacket(PrimaryPacket e)
    {
        Waiting.Enqueue(e);
    }

enter image description here

What would be the best way of implementing the BlockingCollection(T) to deal with this issue? Or another solution?

Also noteworthy, there is about 30,000 items being added to the queue per second.

Josh
  • 2,083
  • 5
  • 23
  • 28
  • 3
    duplicated see: http://stackoverflow.com/questions/6545970/concurrent-collections-eating-too-much-cpu-without-thread-sleep – Polity Oct 09 '11 at 07:39
  • I have tried this method, but it is unable to keep up with 30,000 items being added to the queue per second. – Josh Oct 09 '11 at 07:47
  • I have a hard time believing that. AutoResetEvent isn't the fastest method but 30 000 really isnt that much. You can try to use ManualResetEventSlim which uses a spincount in its inital fase but really that shouldnt be nessesary unless you have code inside the ProcessPackets which takes a long time (but thats outside the scope of this question) – Polity Oct 09 '11 at 10:22
  • I just tested with `BlockingCollection` and got about 2,000,000 per second thoughput. – Nick Butler Oct 09 '11 at 10:23
  • The posted code certainly won't be able to keep up, it burns 100% core. You have to give the thread scheduler a chance to wake up your thread when it is ready to process the data. Like BlockingCollection does. Looping will make you lose the thread quantum. – Hans Passant Oct 09 '11 at 13:26

1 Answers1

6

You don't have to implement BlockingCollection<T>, you can just use it. As the documentation says, it's just a wrapper for IProducerConsumerCollection<T>, such as ConcurrentQueue<T> (which is the default one).

private BlockingCollection<PrimaryPacket> Waiting =
    new BlockingCollection<PrimaryPacket>();

private void ProcessPackets()
{
    while (true)
    {
        PrimaryPacket e = Waiting.Take();
        Packets.TryAdd(((ulong)e.IPAddress << 32 | e.RequestID), e);
    }
}

public void AddPacket(PrimaryPacket e)
{
    Waiting.Add(e);
}

Take() blocks if the queue is empty, so it won't burn CPU unnecessarily. And you should probably consider what to do when the processing is finished.

svick
  • 236,525
  • 50
  • 385
  • 514
  • Thanks. I actually figured this method out before going to bed. Can't believe it was that simple. – Josh Oct 09 '11 at 22:42