3

All C# of the implementations for producer-consumer collections [1] [2] seem to have interfaces vaguely similar to:

private Queue<T> items;

public void Produce(T item)
public T Consume()

Any implementations out there like the following?

private Queue<T> items;

public void Produce(T[] item)
public T[] Consume(int count)

The hope is that this would let me produce/consume varying numbers of items at a time without requiring excessive per-item locking. This seems necessary for producing/consuming large amounts of items, but I haven't had luck finding any implementations.

[1] C# producer/consumer

[2] Creating a blocking Queue<T> in .NET?

Community
  • 1
  • 1
Jay Sullivan
  • 17,332
  • 11
  • 62
  • 86
  • What should happen in case you call `Consume(10)`, and 10 items are not immediately available in the collection? Let's say that currently there are 5 items available. Should these 5 items be returned, or the `Consume(10)` should block (wait) until all 10 items are available? – Theodor Zoulias Apr 14 '22 at 10:04

3 Answers3

6

There are multiple possible ways depending on what exactly you want to implement.

There are the implementations of the IProducerConsumerCollection<T> interface. The only thread safe implementation of this interface in the .NET framework to my knowledge is the BlockingCollection<T>.

This class allows you to have blocking or non-blocking producers and consumers. The producer side is set between blocking and non-blocking by providing a capacity limit to the collection in the constructor. As the documentation of the BlockingCollection<T>.Add(T) method states:

If a bounded capacity was specified when this instance of BlockingCollection<T> was initialized, a call to Add may block until space is available to store the provided item.

For fetching items you can use the different Take and TryTake methods or the extremely handy BlockingCollection<T>.GetConsumingEnumerable() method that creates a IEnumerable<T> that creates a IEnumerator<T> that consumes one element from the BlockingCollection<T> when fetching the next value and blocking in case the source collection is empty. That is until BlockingCollection<T>.CompleteAdding() is called and the collection is not accepting any new data. At this point all instances consuming enumerable instances will stop blocking and report that there is no data anymore (as soon as all remaining data has been consumed.)

So you can basically implement a consumer like this:

BlockingCollection<...> bc = ...
foreach (var item in bc.GetConsumingEnumerable())
{
    // do something with your item
}

Such a consumer could be started in multiple threads so you have multiple threads reading from your source if you choose to. You can create any number of consuming enumerables.

You should be aware that this collection is really only a wrapper. There is a constructor that allows you to set the kind of collection used. By default the ConcurrentQueue<T>. This means that by default the collection behaves like this queue and is the First-In-First-Out collection, in case you only use one producer and one consumer.


All that being said, there is a alternative. In case you don't need the blocking part (or you want to implement the blocking part yourself) and if you don't require any order of elements inside your collection, there is the ConcurrentBag<T>. This collection handles access from multiple threads, very efficiently. It uses smaller collections inside ThreadLocal<T> wrappers. So each thread uses it's own storage and only if the thread runs out of items in it's own storage it starts fetching items from another threads storage.

Using this collection may be interesting in case producing and consuming happens sequential in your use case. So you first add all items and once that is done you consume all items, both with multiple threads.

Nitram
  • 6,486
  • 2
  • 21
  • 32
  • Thank you for the extensive answer, but I accepted the other given that (A) the other answer came first, and (B) in the end, I had been overthinking the problem, and there really was no problem at all; sometimes the simplest answer is the best one. – Jay Sullivan Nov 07 '16 at 22:47
3

The hope is that this would let me produce/consume varying numbers of items at a time without requiring excessive per-item locking.

You could use the BlockingCollection<T> class; although it doesn't have methods to add or take multiple items, it doesn't use locks internally.

Thomas Levesque
  • 286,951
  • 70
  • 623
  • 758
  • 2
    I had made the incorrect assumption that `BlockingCollection` would be slow with many items. After testing, it works fine with high throughput. – Jay Sullivan Nov 07 '16 at 22:45
0

In need of exactly this, I created a method extension myself. Note that if at least one element was removed from the queue, this call will log any further exceptions and returned this one element to prevent anything from being lost.

public static class BlockingCollectionMethodExtensions
{
    public static List<T> FetchAtLeastOneBlocking<T>(this BlockingCollection<T> threadSafeQueue, int maxCount, ICommonLog log)
    {
        var resultList = new List<T>();

        // Take() will block the thread until new elements appear
        // It will also throw an InvalidOperationException when blockingCollection is Completed
        resultList.Add(threadSafeQueue.Take());

        try
        {
            // Fetch more unblocking
            while (threadSafeQueue.Count > 0 && resultList.Count < maxCount)
            {
                T item;
                bool success = false;
                success = threadSafeQueue.TryTake(out item);
                if (success)
                {
                    resultList.Add(item);
                }
                else
                {

                }
            }
        }
        catch (Exception ex)
        {
            log.Fatal($"Unknown error fetching more elements. Continuing to process the {resultList.Count} already fetched items.", ex);
        }

        return resultList;
    }
}

And corresponding tests:

public class BlockingCollectionMethodExtensionsTest : UnitTestBase
{
    [Fact]
    public void FetchAtLeastOneBlocking_FirstEmpty_ThenSingleEntryAdded_ExpectBlocking_Test()
    {
        var queue = new BlockingCollection<int>();

        var startEvent = new ManualResetEvent(initialState: false);
        var completedEvent = new ManualResetEvent(initialState: false);
        List<int> fetchResult = null;

        var thread = new Thread(() =>
        {
            startEvent.Set();
            fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
            completedEvent.Set();
        });
        thread.Start();

        var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
        Assert.True(startedSuccess);

        // Now wait for 2 seconds to ensure that nothing will be fetched
        Thread.Sleep(TimeSpan.FromSeconds(1));
        Assert.Null(fetchResult);

        // Add a new element and verify that the fetch method succeeded
        queue.Add(78);

        var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
        Assert.True(completedSuccess);
        Assert.NotNull(fetchResult);
        Assert.Single(fetchResult);
        Assert.Equal(78, fetchResult.Single());
    }

    [Fact]
    public void FetchAtLeastOneBlocking_FirstEmpty_ThenCompleted_ExpectOperationException_Test()
    {
        var queue = new BlockingCollection<int>();
        Exception catchedException = null;

        var startEvent = new ManualResetEvent(initialState: false);
        var exceptionEvent = new ManualResetEvent(initialState: false);
        List<int> fetchResult = null;

        var thread = new Thread(() =>
        {
            startEvent.Set();
            try
            {
                fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
            }
            catch (Exception ex)
            {
                catchedException = ex;
                exceptionEvent.Set();

            }
        });
        thread.Start();

        var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
        Assert.True(startedSuccess);

        // Now wait for 2 seconds to ensure that nothing will be fetched
        Thread.Sleep(TimeSpan.FromSeconds(1));
        Assert.Null(fetchResult);

        // Now complete the queue and assert that fetching threw the expected exception
        queue.CompleteAdding();

        // Wait for the exception to be thrown
        var exceptionSuccess = exceptionEvent.WaitOne(TimeSpan.FromSeconds(2));
        Assert.True(exceptionSuccess);
        Assert.NotNull(catchedException);
        Assert.IsType<InvalidOperationException>(catchedException);
    }

    [Fact]
    public void FetchAtLeastOneBlocking_SingleEntryExists_ExpectNonblocking_Test()
    {
        var queue = new BlockingCollection<int>();
        // Add a new element and verify that the fetch method succeeded
        queue.Add(78);

        var startEvent = new ManualResetEvent(initialState: false);
        var completedEvent = new ManualResetEvent(initialState: false);
        List<int> fetchResult = null;

        var thread = new Thread(() =>
        {
            startEvent.Set();
            fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
            completedEvent.Set();
        });
        thread.Start();

        var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
        Assert.True(startedSuccess);

        // Now wait for expected immediate completion
        var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
        Assert.True(completedSuccess);
        Assert.NotNull(fetchResult);
        Assert.Single(fetchResult);
        Assert.Equal(78, fetchResult.Single());
    }

    [Fact]
    public void FetchAtLeastOneBlocking_MultipleEntriesExist_ExpectNonblocking_Test()
    {
        var queue = new BlockingCollection<int>();
        // Add a new element and verify that the fetch method succeeded
        queue.Add(78);
        queue.Add(79);

        var startEvent = new ManualResetEvent(initialState: false);
        var completedEvent = new ManualResetEvent(initialState: false);
        List<int> fetchResult = null;

        var thread = new Thread(() =>
        {
            startEvent.Set();
            fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
            completedEvent.Set();
        });
        thread.Start();

        var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
        Assert.True(startedSuccess);

        // Now wait for expected immediate completion
        var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
        Assert.True(completedSuccess);
        Assert.NotNull(fetchResult);
        Assert.Equal(2, fetchResult.Count);
        Assert.Equal(78, fetchResult[0]);
        Assert.Equal(79, fetchResult[1]);
    }

    [Fact]
    public void FetchAtLeastOneBlocking_MultipleEntriesExist_MaxCountExceeded_ExpectNonblocking_Test()
    {
        var queue = new BlockingCollection<int>();
        // Add a new element and verify that the fetch method succeeded
        queue.Add(78);
        queue.Add(79);
        queue.Add(80);
        queue.Add(81);

        var startEvent = new ManualResetEvent(initialState: false);
        var completedEvent = new ManualResetEvent(initialState: false);
        List<int> fetchResult = null;

        var thread = new Thread(() =>
        {
            startEvent.Set();
            fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
            completedEvent.Set();
        });
        thread.Start();

        var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
        Assert.True(startedSuccess);

        // Now wait for expected immediate completion
        var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
        Assert.True(completedSuccess);
        Assert.NotNull(fetchResult);
        Assert.Equal(3, fetchResult.Count);
        Assert.Equal(78, fetchResult[0]);
        Assert.Equal(79, fetchResult[1]);
        Assert.Equal(80, fetchResult[2]);
    }
}
Stephan Møller
  • 1,247
  • 19
  • 39