0

I have a BlockingCollection which I write to from one thread and I read from another. The producer thread takes items received from a server and adds them to the BlockingCollection, while the reading thread attempts to empty the BlockingCollection and process them.

The problem I am trying to empty the queue in batches, because processing them one by one will be too slow. But when it's being constantly written to (thousands of items), then the consumer thread keeps reading them until it's emptied, which means that the processing will not even start until the writing is done.

Now, the processing in the consumer can be done in parallel, so I have been wondering how to go about that.

Currently I have 2 ideas:

  1. After a certain number of items are read from the BlockingCollection in the consumer, start a new parallel job that processes them, instead of waiting to completely empty the queue and THEN start processing.

  2. Use multiple consumers and hope that they will run in parallel instead of just constantly blocking each other while trying to read the BlockingCollection at the same time.

So my question is about option number 2 - is the BlockingCollection internally optimized for such a case? Will it partition the areas that are read from, or will the consumers fight over each item? If that's the case, then option 1 is superior?

ulak blade
  • 2,515
  • 5
  • 37
  • 81
  • The idea of "process in batches" would be to "take Min(x, queue.count) items, process them, do it again" with a fixed "x". Why would you try and empty the queue completely? That's asking for trouble and defeats using the queue in the first place. – Fildor Feb 15 '21 at 09:38
  • BTW: Maybe consider creating a [DataFlow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) pipeline. – Fildor Feb 15 '21 at 09:40
  • @Fildor So what you are suggesting is option 2, is that correct? Take some amount of items and start a new parallel job that processes them, then move on to the next batch, etc. – ulak blade Feb 15 '21 at 09:42
  • 1
    Fildor is suggesting option 1, surely? – Matthew Watson Feb 15 '21 at 09:42
  • You can do that with one consumer or more. I'd start with one, then see if I need to scale out. – Fildor Feb 15 '21 at 09:43
  • The point of my first comment was "you are doing 'process _in batches_' wrong". If you say "I want to process something in batches", you need to actually build batches. Like... take ten items of x, compute some property of each x and store that somewhere. Take next 10 item of x. And so on. You don't say "take all x until no new x arrive" ... that doesn't really make sense here. – Fildor Feb 15 '21 at 09:46
  • 1
    `But when it's being constantly written to (thousands of items), then the consumer thread keeps reading them until it's emptied, which means that the processing will not even start until the writing is done.` I have read that sentence multiple times and can't work out what it means. – mjwills Feb 15 '21 at 09:46
  • `The problem I am trying to empty the queue in batches, because processing them one by one will be too slow. ` So call `TryTake` x times to get a batch of up to x entries (well `Take` then `TryTake`). – mjwills Feb 15 '21 at 09:47
  • @mjwills Consumer : 1. Read items _until source is empty_ 2. Source is never empty. That seems to be his problem. – Fildor Feb 15 '21 at 09:48
  • 2
    @Fildor Ah, in that case the OP should "stop doing that". :) – mjwills Feb 15 '21 at 09:48
  • 1
    `Use multiple consumers and hope that they will run in parallel instead of just constantly blocking each other while trying to read the BlockingCollection at the same time.` That is literally its job, so I presume it will be OK. But profile to be sure. If it is a problem, have a single consumer that batches entries into `List` and then adds them to a _second_ `BlockingCollection` (that is consumed from by multiple readers) to reduce the contention. But honestly it is unlikely to be needed. – mjwills Feb 15 '21 at 09:51
  • 1
    @mjwills Yes, I think it is as easy as adding " or after 10 items max" to the "stop reading if queue is empty". – Fildor Feb 15 '21 at 09:51
  • 1
    @Fildor Agreed. So, `Take` once and then `TryTake` up to 9 times. Agreed. – mjwills Feb 15 '21 at 09:52
  • 1
    @mjwills Well, "10" was just an arbitrary example, but I think we both are talking about the same thing, yes. – Fildor Feb 15 '21 at 09:53
  • 1
    Aaaand again: This smells like "Dataflow" to me. Just have a look at [BatchBlock](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library#batchblockt) – Fildor Feb 15 '21 at 09:55
  • Another option seems to be wrapping a class around a collection and using ReaderWriterLockSlim. Since I have a single producer and multiple consumers, this should be faster than a regular BlockingCollection? – ulak blade Feb 15 '21 at 09:57
  • 1
    I don't know if it would be faster. But the intended purpose of a `BlockingCollection` is literally _"Provides blocking and bounding capabilities for **thread-safe collections** that implement IProducerConsumerCollection."_ – Fildor Feb 15 '21 at 10:00
  • 1
    `Since I have a single producer and multiple consumers, this should be faster than a regular BlockingCollection?` https://ericlippert.com/2012/12/17/performance-rant/ – mjwills Feb 15 '21 at 10:01
  • 2
    Before you jump to more custom made solutions, why don't you try the easy fix to your current solution that has now been suggested by 2 users? And if that doesn't please you, why don't you take the opposite path and try the even higher abstraction that has been created by very smart people with dozens of testers and that is out in the field for several years now? – Fildor Feb 15 '21 at 10:07
  • BTW: BenchmarkDotNet is a great tool if it comes to _"this should be faster"_. Just last week, I had two instances of "should be faster" - which in one case turned out true but in the other surprisingly turned out to be so utterly wrong, that it would had costed me a week in refactoring just to see that I just broke my app and wasted a week. – Fildor Feb 15 '21 at 10:15

2 Answers2

2

To add just another alternative: (in no way production-ready!)

This makes use of TPL's Dataflow, where BatchBlock<T> abstracts the batching away for us.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public class HoneyBatcher
{
    private const int BATCHSIZE = 10; // Find the size that works best for you.
    private readonly BatchBlock<Honey> batchBlock;
    private readonly ExecutionDataflowBlockOptions _options = 
                     new ExecutionDataflowBlockOptions()
    {
         // I'd start with 1, then benchmark if higher number actually benefits.
         MaxDegreeOfParallelism = 1, 
         SingleProducerConstrained = true // if so, may micro-optimize throughput
    };
                       // vv Whatever process you want done on a batch
    public HoneyBatcher( Action<Honey[]> batchProcessor )
    {
        // BatchBlock does the batching
        // and is the entrypoint to the pipline.
        batchBlock = new BatchBlock<Honey>(BATCHSIZE);
        // processorBlock processes each batch that batchBlock will produce
        // Parallel executions as well as other tweaks can be configured through options.
        ActionBlock<Honey[]> processorBlock = 
                             new ActionBlock<Honey[]>(batchProcessor, _options);
        // build the pipline
        batchBlock.LinkTo(processorBlock);
        // item => batchBlock => item[BATCHSIZE] => batchProcessor(item[])
    }

    // Add item individually and have them batched up
    // and processed in a pipeline.
    public Task<bool> ProcessAsync(Honey item)
    {
        return batchBlock.SendAsync(item);
        // Can also be done with sync API.
    }
}

public class Honey 
{
    // Just a dummy
}

Be advised that above snippet is just a rough layout of the idea. In production, you would of course address error handling, completion, etc.

Fildor
  • 14,510
  • 4
  • 35
  • 67
0

A natural way to process the items in batches would be to insert them in the BlockingCollection in batches, instead of trying to remove them later in batches. In other words you could use a BlockingCollection<T[]> instead of BlockingCollection<T>. The producer thread could do the batching easily by using a Queue<T>:

var queue = new Queue<T>;
while (someCondition)
{
    var item = ProduceItem();
    queue.Enqueue(item);
    if (queue.Count == batchSize)
    {
        blockingCollection.Add(queue.ToArray());
        queue.Clear();
    }
}
if (queue.Count > 0)
{
    blockingCollection.Add(queue.ToArray());
    queue.Clear();
}
blockingCollection.CompleteAdding();

Depending on the situation you could also use some LINQ-style operator like the Batch from the MoreLinq library.

Finally, to answer your main question, yes, the BlockingCollection class handles excellently multiple consumers as well as multiple producers. In case the collection is empty all consumers are blocked, and when an item arrives it is given to one of the waiting consumers.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 2
    Be aware that `Batch` doesn't play well with any concurrent data structure that implements `ICollection`. – mjwills Feb 15 '21 at 13:01
  • @mjwills Oh, I wasn't aware of that. Do you have something to read about this, that you would recommend? – Fildor Feb 15 '21 at 13:10
  • 2
    @Fildor It is the case with most LINQ - but see https://github.com/morelinq/MoreLINQ/issues/794 for example. – mjwills Feb 15 '21 at 19:42
  • 1
    @Fildor another example: [Calling ToList() on ConcurrentDictionary while adding items](https://stackoverflow.com/questions/41038514/calling-tolist-on-concurrentdictionarytkey-tvalue-while-adding-items). From the [documentation](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2#thread-safety): *"Members accessed through one of the interfaces the `ConcurrentDictionary` implements, including extension methods, are not guaranteed to be thread safe and may need to be synchronized by the caller."* – Theodor Zoulias Feb 15 '21 at 23:06