5

Is there a way to batch a collection of items from the blocking collection. E.G.

I have a messaging bus publisher calling blockingCollection.Add()

And a consuming thread which is created like this:

Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
                {
                    Console.WriteLine(value);
                }
        });

However, I only want the Console to write after the blocking collection has 10 items on it, whereas GetConsumingEnumerable() always fires after each item is added. I could write my own queue for this but I'd like to use the blocking collection if possible?

  • 2
    @HansPassant: But that will result in items beyond the 10th to not be processed...? – Jon Feb 11 '14 at 14:13

2 Answers2

4

A quick solution would be something like this

public class ConsoleQueue
{
    private readonly List<string> _values = new List<string>();

    public void FlushQueueIfFull()
    {
        if (_values.Count < 10) return;
        foreach (var value in _values)
        {
            Console.WriteLine(value);
        }
        _values.Clear();
    }

    public void Push(string message)
    {
        _values.Add(message);
        FlushQueueIfFull();
    }
}

then you can use it like this

        var queue = new ConsoleQueue();

        Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
            {
                queue.Push(value);
            }
        });

You can easily extend it to cover thread safety etc

DrinkBird
  • 834
  • 8
  • 17
  • 1
    +1 - Another alternative would be to use DataFlow BatchBlock, that already implements thread-safe batching. – Dimitri Feb 11 '14 at 14:39
4

Not sure what the project requirements are but I'd recommend TPL DataFlow BatchBlock.

You would instantiate a BatchBlock<string>, bind it to an ActionBlock<string> and then post to the batch block.

A pseudo code might look something like this:

var bb = new BatchBlock<string>(10);
var ab = new ActionBlock<string[]>(msgArray=>{ 
    foreach(var msg in msgArray) 
        Console.Writeline(msg);
});

bb.LinkTo(ab);

foreach (string value in blockingCollection.GetConsumingEnumerable())
{
      bb.Post(value);
}

Using DataFlow you might even want to replace the BlockingCollection with a BufferBlock or just post to the buffer block directly without first adding to the blocking collection, since batch block is already thread-safe.

Dimitri
  • 6,923
  • 4
  • 35
  • 49
  • Thanks Dmitri.. Just getting a type conversion problem here on the bb.LinkTo(ab). Problem converting BatchBlock to ISourceBlock... – The Unculled Badger Feb 11 '14 at 14:56
  • My bad. ActionBlock must be ActionBlock, cause it receives an array of string from the batch block. See updated answer – Dimitri Feb 11 '14 at 15:00
  • 1
    This one marked correct because I got to learn something new with TPL DataFlow! Thanks Dmitri. – The Unculled Badger Feb 11 '14 at 15:06
  • @Dimitri [According to MSDN](https://msdn.microsoft.com/en-us/library/hh194745%28v=vs.110%29.aspx) `BatchBlock` is not thread-safe (see bottom of linked page). – Coder1095 May 01 '15 at 23:29