0

I was using BlockingCollection to queue data received over network and process it. I was consuming from the blockingcollection at every 100 ms in a timer. But I found that consumer was slow and length of blockingcollection was ever increasing.

Since I was not quite sure, what caused this behavior, I wrote a simple console app as shown below. But still the BlockingCollection count is ever increasing. Note that this time I used a dedicated thread for consuming.

class Program
{
    static Thread sm_producerThread;
    static Thread sm_consumerThread;
    static BlockingCollection<int> sm_blockingCollection = new BlockingCollection<int>();
    static int sm_index;

    static void Main(string[] args)
    {
        sm_producerThread = new Thread(Produce) { IsBackground = true };
        sm_producerThread.Start();
        sm_consumerThread = new Thread(Consume) { IsBackground = true };
        sm_consumerThread.Start();
        Console.ReadLine();
    }
    
    private static void Produce()
    {
        while (true)
        {
            sm_index++;
            sm_blockingCollection.TryAdd(sm_index);
            Console.WriteLine($"Added number {sm_index}. Collection count is {sm_blockingCollection.Count}");
        }
    }

    private static void Consume()
    {
        while (true)
        {
            var num = sm_blockingCollection.Take();
            Console.WriteLine($"Fetched number {num}. Collection count is {sm_blockingCollection.Count}");
        }
    }
}

Fragment of a sample output is shown below

Fetched number 74317. Collection count is 1426
Fetched number 74318. Collection count is 1425
Fetched number 74319. Collection count is 1424
Fetched number 74320. Collection count is 1423
Fetched number 74321. Collection count is 1422
Fetched number 74322. Collection count is 1421
Fetched number 74323. Collection count is 1420
Added number 75743. Collection count is 1453
Added number 75744. Collection count is 1420
Added number 75745. Collection count is 1421
Added number 75746. Collection count is 1422
Added number 75747. Collection count is 1423
Added number 75748. Collection count is 1424
Added number 75749. Collection count is 1425
Added number 75750. Collection count is 1426
Added number 75751. Collection count is 1427
Added number 75752. Collection count is 1428
Added number 75753. Collection count is 1429
Added number 75754. Collection count is 1430
Added number 75755. Collection count is 1431
Added number 75756. Collection count is 1432
Added number 75757. Collection count is 1433
Added number 75758. Collection count is 1434
Added number 75759. Collection count is 1435
Added number 75760. Collection count is 1436
Added number 75761. Collection count is 1437
Added number 75762. Collection count is 1438
Added number 75763. Collection count is 1439
Fetched number 74324. Collection count is 1419
Added number 75764. Collection count is 1440
Added number 75765. Collection count is 1440
Added number 75766. Collection count is 1441
Added number 75767. Collection count is 1442
Added number 75768. Collection count is 1443
Added number 75769. Collection count is 1444
Added number 75770. Collection count is 1445
Added number 75771. Collection count is 1446
Added number 75772. Collection count is 1447
Added number 75773. Collection count is 1448
Added number 75774. Collection count is 1449
Added number 75775. Collection count is 1450
Added number 75776. Collection count is 1451
Added number 75777. Collection count is 1452
Added number 75778. Collection count is 1453
Added number 75779. Collection count is 1454
Added number 75780. Collection count is 1455
Added number 75781. Collection count is 1456
Added number 75782. Collection count is 1457
Added number 75783. Collection count is 1458
Added number 75784. Collection count is 1459
Added number 75785. Collection count is 1460
Added number 75786. Collection count is 1461
Added number 75787. Collection count is 1462
Added number 75788. Collection count is 1463
Added number 75789. Collection count is 1464
Added number 75790. Collection count is 1465
Added number 75791. Collection count is 1466
Added number 75792. Collection count is 1467
Added number 75793. Collection count is 1468
Added number 75794. Collection count is 1469
Added number 75795. Collection count is 1470
Added number 75796. Collection count is 1471
Added number 75797. Collection count is 1472
Added number 75798. Collection count is 1473
Added number 75799. Collection count is 1474
Added number 75800. Collection count is 1475
Added number 75801. Collection count is 1476
Added number 75802. Collection count is 1477
Added number 75803. Collection count is 1478
Added number 75804. Collection count is 1479
Added number 75805. Collection count is 1480
Added number 75806. Collection count is 1481
Added number 75807. Collection count is 1482
Added number 75808. Collection count is 1483
Added number 75809. Collection count is 1484
Added number 75810. Collection count is 1485
Added number 75811. Collection count is 1486
Added number 75812. Collection count is 1487
Added number 75813. Collection count is 1488
Added number 75814. Collection count is 1489
Added number 75815. Collection count is 1490
Added number 75816. Collection count is 1491
Added number 75817. Collection count is 1492
Added number 75818. Collection count is 1493

As you can see that count actually reached 1490 by the time 75818 entries was processed. I was hoping that the count of items in the collection will be a small number (say around 100) at any point of time since both the threads might have almost same priority and same work load. What am I missing?

Arctic
  • 807
  • 10
  • 22
  • `Count` is approximate as per the docs - https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1.count?view=netcore-3.1#remarks . – mjwills Jul 24 '20 at 11:56
  • `since both the threads might have almost same priority and same work load.` How did you come to the conclusion that `TryAdd` and `Take` were the same work load? – mjwills Jul 24 '20 at 11:58
  • 3
    Even if the producer creates only 0.001% more items per time as the consumer can handle, the number of items will increase on average. – Klaus Gütter Jul 24 '20 at 12:02
  • 1
    If you are concerned about the collection getting bigger, constrain its size (https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1.-ctor?view=netcore-3.1#System_Collections_Concurrent_BlockingCollection_1__ctor_System_Int32_) and then use `Add` rather than `TryAdd`. This effectively adds back-pressure (i.e. slows down the producer if the consumer isn't keeping up). – mjwills Jul 24 '20 at 12:03
  • 1
    Your experiment do not reveal that `Take` is slow. It reveals that `Take` might be a bit *slower* than `TryAdd`. It also reveals that building a producer-consumer system on the hope that the consumer will be at least as fast as the producer, is not a robust way to build this system. – Theodor Zoulias Jul 24 '20 at 12:24
  • 1
    @mjwills When I said workload, I didn't mean about the peformance of the 'TryAdd' and take methods, rather the remaining code block. But your question enlightened me on the aspect that I should have considered those too. – Arctic Jul 24 '20 at 12:41
  • @mjwills Thank you for the link to other SO article. Actually I have the same scenario and wouldn't have posted this if I could see that earlier :-) – Arctic Jul 24 '20 at 12:43
  • @KlausGütter and @ Theodor Zoulias Thank you for your inputs. I was more concerned that both the methods only do a Console.WriteLine and how come on thread was slower – Arctic Jul 24 '20 at 12:47
  • 1
    @TheodorZoulias Yes, I understand that there is lot of difference between the words 'slow' and 'slower'. Updated my question – Arctic Jul 24 '20 at 13:21
  • 1
    Do mind that outputting to the console isn't necessarily the best way to measure performance: https://stackoverflow.com/questions/18464833/console-writeline-effect-on-performance – Peter Bons Jul 24 '20 at 15:53

1 Answers1

-1

I copied your code, increased the priority of the consumer thread to AboveNormal, and wrote to the console every 1000 items. After running for several seconds, I saw no persistent increase in the collection. I agree with the comment that would seem to suggest that TryAdd is slightly faster than Take on average and if the threads run at the same priority, the collection will increase in size over time.

I was consuming from the blockingcollection at every 100 ms in a timer.

If I understand this comment correctly, you're using a timer to wake up your processing thread 10 times a second. Am I to assume that you call Take once and then wait for the next timer event? How often does your producer send data? If it's more than 10 times a second and you're only processing 10 data blocks per second, then the problem is that you're not servicing your "buffer" (BlockingCollection) often enough, which would explain its perpetual growth.

The software I work on is a glorified packet sniffer. When we turn it on, we get tons of network packets that have to be processed. Our collection thread is highly optimized to facilitate this. Here's some rough pseudocode for it:

while (!shutdown)
{
    // Allocate the array for storing the packets.
    var packets = new packet[10000];

    // Start the timer using 100ms interval.
    timer.Start();

    // Read packets until the timer elapses or the capacity is reached.
    do
    {
        packets[index++] = device.Read(timeout);
    }
    while (!timerElapsed && packets.Length < capacity);

    // Notify the processing thread that data is available to be processed.
    PacketsReceived.Raise(this, new PacketEventArgs(packets));
}

There's more to it than this, but this is the basic gist. Note that this thread simply focuses on receiving data from the device. Raising the event transfers the array to the processing thread and then uses a ManualResetEvent to notify the processing thread that data is available to be processed. In other words, the processing thread sits idle until the collection thread notifies it of work to do, which occurs as part of the data exchange.

Using your example, the data exchange could be accomplished using the BlockingCollection such that the collection thread adds lists of data elements to be processed to the collection while the processing thread pulls the next list of data elements to be processed from the collection. Thus, while your processing thread is chugging through the data elements that have been received, your collection thread is getting the next list of data elements to be processed.

HTH

Matt Davis
  • 45,297
  • 16
  • 93
  • 124