16

I've come up with some code to consume all wating items from a queue. Rather than processing the items 1 by 1, it makes sense to process all waiting items as a set.

I've declared my queue like this.

private BlockingCollection<Item> items = 
    new BlockingCollection<Item>(new ConcurrentQueue<Item>);

Then, on a consumer thread, I plan to read the items in batches like this,

Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
    var workToDo = new List<Item>();
    workToDo.Add(nextItem);

    while(this.items.TryTake(out nextItem))
    {
        workToDo.Add(nextItem);
    }

    // process workToDo, then go back to the queue.
}

This approach lacks the utility of GetConsumingEnumerable and I can't help wondering if I've missed a better way, or if my approach is flawed.

Is there a better way to consume a BlockingCollection<T> in batches?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Jodrell
  • 34,946
  • 5
  • 87
  • 124
  • 1
    You could do a Take(50) on the ConsumingEnumerable but you would lose the effect of the 50ms timeout. So choose what is more important. – H H Sep 04 '12 at 09:40
  • @HenkHolterman, you're right, I don't actually need that, it would be problematic it items were produced at a faster rate. – Jodrell Sep 04 '12 at 09:57
  • @HenkHolterman, question edited accordingly. – Jodrell Sep 04 '12 at 09:59
  • No a faster rate would be no problem, a slower one would, Your current code (w/o timeout) is less suitable to be replaced by ConsumingEnumerable – H H Sep 04 '12 at 10:48
  • @HenkHolterman, I guess it depends if I want to wait for bigger batches, in which case something different is called for. – Jodrell Sep 04 '12 at 11:12
  • 1
    That's what I meant with making a choice: min/max batch size and min/max waiting time. – H H Sep 04 '12 at 11:42

3 Answers3

6

A solution is to use the BufferBlock<T> from System.Threading.Tasks.Dataflow (which is included in .net core 3+). It does not use GetConsumingEnumerable(), but it still does allow you the same utility, mainly:

  • allows parallel processing w/ multiple (symmetrical and/or asymmetrical) consumers and producers
  • thread safe (allowing for the above) - no race conditions to worry about
  • can be cancelled by a cancellation token and/or collection completion
  • consumers block until data is available, avoiding wasting CPU cycles on polling

There is also a BatchBlock<T>, but that limits you to fixed sized batches.

var buffer = new BufferBlock<Item>();
while (await buffer.OutputAvailableAsync())
{
    if (buffer.TryReceiveAll(out var items))
        //process items
}

Here is a working example, which demos the following:

  • multiple symmetrical consumers which process variable length batches in parallel
  • multiple symmetrical producers (not truly operating in parallel in this example)
  • ability to complete the collection when the producers are done
  • to keep the example short, I did not demonstrate the use of a CancellationToken
  • ability to wait until the producers and/or consumers are done
  • ability to call from an area that doesn't allow async, such as a constructor
  • the Thread.Sleep() calls are not required, but help simulate some processing time that would occur in more taxing scenarios
  • both the Task.WaitAll() and the Thread.Sleep() can optionally be converted to their async equivalents
  • no need to use any external libraries
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static class Program
{
    static void Main()
    {
        var buffer = new BufferBlock<string>();

        // Kick off consumer task(s)
        List<Task> consumers = new List<Task>();
        for (int i = 0; i < 3; i++)
        {
            consumers.Add(Task.Factory.StartNew(async () =>
            {
                // need to copy this due to lambda variable capture
                var num = i; 
                while (await buffer.OutputAvailableAsync())
                {
                    if (buffer.TryReceiveAll(out var items))
                        Console.WriteLine($"Consumer {num}:    " + 
                            items.Aggregate((a, b) => a + ", " + b));

                        // real life processing would take some time
                        await Task.Delay(500); 
                }

                Console.WriteLine($"Consumer {num} complete");
            }));

            // give consumer tasks time to activate for a better demo
            Thread.Sleep(100); 
        }

        // Kick off producer task(s)
        List<Task> producers = new List<Task>();
        for (int i = 0; i < 3; i++)
        {
            producers.Add(Task.Factory.StartNew(() =>
            {
                for (int j = 0 + (1000 * i); j < 500 + (1000 * i); j++)
                    buffer.Post(j.ToString());
            }));

            // space out the producers for a better demo
            Thread.Sleep(10); 
        }

        // may also use the async equivalent
        Task.WaitAll(producers.ToArray());
        Console.WriteLine("Finished waiting on producers");

        // demo being able to complete the collection
        buffer.Complete(); 

        // may also use the async equivalent
        Task.WaitAll(consumers.ToArray()); 
        Console.WriteLine("Finished waiting on consumers");

        Console.ReadLine();
    }
}

Here is a mondernised and simplified version of the code.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    private static async Task Main()
    {
        var buffer = new BufferBlock<string>();

        // Kick off consumer task(s)
        var consumers = new List<Task>();
        for (var i = 0; i < 3; i++)
        {
            var id = i;
            consumers.Add(Task.Run(() => StartConsumer(id, buffer)));

            // give consumer tasks time to activate for a better demo
            await Task.Delay(100);
        }

        // Kick off producer task(s)
        var producers = new List<Task>();
        for (var i = 0; i < 3; i++)
        {
            var pid = i;
            producers.Add(Task.Run(() => StartProducer(pid, buffer)));

            // space out the producers for a better demo
            await Task.Delay(10);
        }

        // may also use the async equivalent
        await Task.WhenAll(producers);
        Console.WriteLine("Finished waiting on producers");

        // demo being able to complete the collection
        buffer.Complete();

        // may also use the async equivalent
        await Task.WhenAll(consumers);
        Console.WriteLine("Finished waiting on consumers");

        Console.ReadLine();
    }

    private static async Task StartConsumer(
            int id,
            IReceivableSourceBlock<string> buffer)
    {
        while (await buffer.OutputAvailableAsync())
        {
            if (buffer.TryReceiveAll(out var items))
            {
                Console.WriteLine($"Consumer {id}: " + 
                    items.Aggregate((a, b) => a + ", " + b));
            }

            // real life processing would take some time
            await Task.Delay(500);
        }

        Console.WriteLine($"Consumer {id} complete");
    }

    private static Task StartProducer(int pid, ITargetBlock<string> buffer)
    {
        for (var j = 0 + (1000 * pid); j < 500 + (1000 * pid); j++)
        {
            buffer.Post(j.ToString());
        }

        return Task.CompletedTask;
    }
}
Jodrell
  • 34,946
  • 5
  • 87
  • 124
vlee
  • 345
  • 4
  • 3
  • The styling and quality of the example could be improved e.g. make it fit on the screen and use `await Task.Delay` instead of `Thread.Sleep`, and other improvements too I suspect. – Jodrell Nov 26 '19 at 12:10
  • Thanks for the feedback. I have reformatted to eliminate horizontal scrolling. I debated using `await Task.Delay()` instead of `Thread.Sleep()`, but I wanted to show that this code can be created from a constructor or other places where async isn't allowed. What do you think? – vlee Nov 26 '19 at 20:11
  • I think you should have and [async Main](https://blogs.msdn.microsoft.com/mazhou/2017/05/30/c-7-series-part-2-async-main/) and that you should not sleep or delay in a constructor. – Jodrell Nov 27 '19 at 09:00
  • I couldn't resist tweaking your code, see the edited answer. – Jodrell Nov 27 '19 at 09:34
  • The Sleep()/Delay() is to simulate more intensive processing in real life. Thus, I wanted to include it in the example (although it should be be replaced by real code in production). At the same time, I did not want to confuse anyone into thinking that async was required to run this code. Thanks for posting the async version of the code though! – vlee Nov 27 '19 at 20:19
2

While not as good as ConcurrentQueue<T> in some ways, my own LLQueue<T> allows for a batched dequeue with a AtomicDequeueAll method where all items currently on the queue are taken from it in a single (atomic and thread-safe) operation, and are then in a non-threadsafe collection for consumption by a single thread. This method was designed precisely for the scenario where you want to batch the read operations.

This isn't blocking though, though it could be used to create a blocking collection easily enough:

public BlockingBatchedQueue<T>
{
  private readonly AutoResetEvent _are = new AutoResetEvent(false);
  private readonly LLQueue<T> _store;
  public void Add(T item)
  {
    _store.Enqueue(item);
    _are.Set();
  }
  public IEnumerable<T> Take()
  {
    _are.WaitOne();
    return _store.AtomicDequeueAll();
  }
  public bool TryTake(out IEnumerable<T> items, int millisecTimeout)
  {
    if(_are.WaitOne(millisecTimeout))
    {
      items = _store.AtomicDequeueAll();
      return true;
    }
    items = null;
    return false;
  }
}

That's a starting point that doesn't do the following:

  1. Deal with a pending waiting reader upon disposal.
  2. Worry about a potential race with multiple readers both being triggered by a write happening while one was reading (it just considers the occasional empty result enumerable to be okay).
  3. Place any upper-bound on writing.

All of which could be added too, but I wanted to keep to the minimum of some practical use, that hopefully isn't buggy within the defined limitations above.

Jon Hanna
  • 110,372
  • 10
  • 146
  • 251
  • I wonder, if you made a `BatchedQueue` that implemented `IProducerConsumer>` ... or if `LLQueue` did, now I'm wondering if its me who is missing somthing or rather, its the framework. – Jodrell Sep 04 '12 at 09:45
  • `LLQueue` implements `IProducerConsumerCollection` alright, but using that directly in a blocking collection means we lose the access to the `AtomicDequeueAll()` that is the only advantage it has here over anything else. You could try wrapping `LLQueue` in a class that implemented `IProducerConsumer>` by calling `EnqueueRange()` and `AtomicDequeueAll()`, but `EnqueueRange()` isn't atomic, so there could be periods when you're waiting to read while there are actually items there to be read, because the blocking collection doesn't realise it. – Jon Hanna Sep 04 '12 at 09:56
  • The `TryAdd` would need to be atomic for the whole `IEnumerable`. – Jodrell Sep 04 '12 at 10:07
  • Exactly. `EnqueueRange()` is little more than a convenience on repeatedly calling `Enqueue()`. Fine if you don't mind it being inter-leaved with other `Enqueue()s`, but useless if we were to wrap it in something used by something that expects it to be atomic. That said, I imagine `BlockingCollection` doesn't allow reads until a write has returned, so it might actually be okay in that regard. I really don't like the idea of stretching expected behaviour that much though. – Jon Hanna Sep 04 '12 at 10:17
  • Actually, just taking a look at it for another reason, it appears I forgot my own code: EnqueueRange() is currently atomic, I had just decided not to guarantee that because I wasn't sure if it was the best approach so I wanted the right to change that later. With the current code, you could definitely make a `BlockingCollection>` that is atomic for both adding and removing. – Jon Hanna Dec 10 '13 at 09:45
0

No, there is no better way. Your approach is basically correct.

You could wrap the "consume-in-batches" functionality in an extension method, for ease of use. The implementation below uses the same List<T> as a buffer during the whole enumeration, with the intention to prevent the allocation of a new buffer on each iteration. It also includes a maxSize parameter, that allows to limit the size of the emitted batches:

/// <summary>
/// Consumes the items in the collection in batches. Each batch contains all
/// the items that are immediately available, up to a specified maximum number.
/// </summary>
public static IEnumerable<T[]> GetConsumingEnumerableBatch<T>(
    this BlockingCollection<T> source, int maxSize,
    CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    if (maxSize < 1) throw new ArgumentOutOfRangeException(nameof(maxSize));
    if (source.IsCompleted) yield break;
    var buffer = new List<T>();
    while (source.TryTake(out var item, Timeout.Infinite, cancellationToken))
    {
        Debug.Assert(buffer.Count == 0);
        buffer.Add(item);
        while (buffer.Count < maxSize && source.TryTake(out item))
            buffer.Add(item);
        T[] batch = buffer.ToArray();
        int batchSize = batch.Length;
        buffer.Clear();
        yield return batch;
        if (batchSize < buffer.Capacity >> 2)
            buffer.Capacity = buffer.Capacity >> 1; // Shrink oversized buffer
    }
}

Usage example:

foreach (Item[] batch in this.items.GetConsumingEnumerableBatch(Int32.MaxValue))
{
    // Process the batch
}

The buffer is shrank in half, every time an emitted batch is smaller than a quarter of the buffer's capacity. This will keep the buffer in control, in case it has become oversized at some point during the enumeration.

The intention of the if (source.IsCompleted) yield break line is to replicate the behavior of the built-in GetConsumingEnumerable method, when it is supplied with an already canceled token, and the collection is empty and completed.

In case of cancellation, no buffered messages are in danger of being lost. The cancellationToken is checked only when the buffer is empty.

A simpler implementation without memory management features, can be found in the first revision of this answer.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104