No, there is no better way. Your approach is basically correct.
You could wrap the "consume-in-batches" functionality in an extension method, for ease of use. The implementation below uses the same List<T>
as a buffer during the whole enumeration, with the intention to prevent the allocation of a new buffer on each iteration. It also includes a maxSize
parameter, that allows to limit the size of the emitted batches:
/// <summary>
/// Consumes the items in the collection in batches. Each batch contains all
/// the items that are immediately available, up to a specified maximum number.
/// </summary>
public static IEnumerable<T[]> GetConsumingEnumerableBatch<T>(
this BlockingCollection<T> source, int maxSize,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
if (maxSize < 1) throw new ArgumentOutOfRangeException(nameof(maxSize));
if (source.IsCompleted) yield break;
var buffer = new List<T>();
while (source.TryTake(out var item, Timeout.Infinite, cancellationToken))
{
Debug.Assert(buffer.Count == 0);
buffer.Add(item);
while (buffer.Count < maxSize && source.TryTake(out item))
buffer.Add(item);
T[] batch = buffer.ToArray();
int batchSize = batch.Length;
buffer.Clear();
yield return batch;
if (batchSize < buffer.Capacity >> 2)
buffer.Capacity = buffer.Capacity >> 1; // Shrink oversized buffer
}
}
Usage example:
foreach (Item[] batch in this.items.GetConsumingEnumerableBatch(Int32.MaxValue))
{
// Process the batch
}
The buffer is shrank in half, every time an emitted batch is smaller than a quarter of the buffer's capacity. This will keep the buffer in control, in case it has become oversized at some point during the enumeration.
The intention of the if (source.IsCompleted) yield break
line is to replicate the behavior of the built-in GetConsumingEnumerable
method, when it is supplied with an already canceled token, and the collection is empty and completed.
In case of cancellation, no buffered messages are in danger of being lost. The cancellationToken
is checked only when the buffer
is empty.
A simpler implementation without memory management features, can be found in the first revision of this answer.