0

For a multithreaded application I want to await until a BlockingCollection is completed and empty (IsCompleted = true). I implemented the below and this seems to be working.

Since it's multithreading I don't even trust my own shadow. Would this be a robust implementation?

public class BlockingCollectionEx<T> : BlockingCollection<T>
{
    public Task WaitCompleted => completedManualResetEvent.Task;
    private readonly TaskCompletionSource completedManualResetEvent = new();

    public new void CompleteAdding()
    {
        base.CompleteAdding();

        lock (completedManualResetEvent)
        {
            if (base.Count == 0 && !completedManualResetEvent.Task.IsCompleted)
                completedManualResetEvent.SetResult();
        }
    }

    public new IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var item in base.GetConsumingEnumerable())
            yield return item;

        lock (completedManualResetEvent) //if GetConsumingEnumerable is used by multiple threads, the 2nd one would throw an InvalidOperationException 
        {
            if (!completedManualResetEvent.Task.IsCompleted)
                completedManualResetEvent.SetResult();
        }
    }
    public new IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) => throw new NotImplementedException();

    public new T Take() => throw new NotImplementedException();
    public new T Take(CancellationToken cancellationToken) => throw new NotImplementedException();

    public new bool TryTake([MaybeNullWhen(false)] out T item) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout) => throw new NotImplementedException();
}

usage:

var x = new BlockingCollectionEx<int> { 1, 2, 3 };
x.CompleteAdding();

Task.Run(() =>
{
    foreach (var item in x.GetConsumingEnumerable())
        // do stuff in Task 1
});
Task.Run(() =>
{
    foreach (var item in x.GetConsumingEnumerable())
        // do stuff in Task 2
});

await x.WaitCompleted;
Debug.Assert(x.IsCompleted);
// do stuff since the collection is emtpy
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Wouter Van Ranst
  • 521
  • 4
  • 14
  • 3
    It looks like it should work, but there are a couple of code smells that make me wonder if there's a better way to achieve your goal. Extending a collection class generally leads to sorrow, for example. Since `Task.Run` returns a task, can you not use a regular BlockingCollection and `await Task.Run(...);`? – StriplingWarrior Jun 23 '21 at 14:39
  • 3
    In case you are interested about a consumable collection with asynchronous API, you may find this question interesting: [Is there anything like asynchronous BlockingCollection?](https://stackoverflow.com/questions/21225361/is-there-anything-like-asynchronous-blockingcollectiont) – Theodor Zoulias Jun 23 '21 at 15:48
  • Why do you use a threadsafe collection just to use it inside a single thread? – Red Jun 23 '21 at 15:50
  • @Red example usage updated. I'm interested in/leveraging in the blocking aspect of GetConsumingEnumerable() – Wouter Van Ranst Jun 24 '21 at 07:00
  • So you are basically trying to process the elements in a list within multiple threads, you would like that each thread will (i assume) remove the element is currently processing, or at least, that other threads skip it. Once all threads finds that there are no other elements in the list they will complete the task. If this is the case, are you really sure that you need all of this infrastructure to achieve it? – Red Jun 24 '21 at 07:50
  • If you would like a suggestion about how to implement robustly this functionality, you could take a look at [this](https://stackoverflow.com/questions/15928642/blockingcollection-max-size/65516390#65516390) answer. It contains a custom `BlockingCollection` implementation that is based on a `Channel`. The `Channel` class already has a `Task Completion` property, which AFAICS is exactly what you need. This is probably not the most efficient solution to the problem though. – Theodor Zoulias Jun 24 '21 at 08:23
  • To be frank, this entire solution seems like a square wheel. OP, have you perhaps looked at existing processor frameworks, like [TPL DataFlow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library), Rx.NET, Akka.Net etc... – Aron Jun 24 '21 at 09:12
  • @Theodor, thanks for that link. MostRecentBlockingCollection seems like a good alternative – Wouter Van Ranst Jun 24 '21 at 10:10
  • @Aron, I'm moving away from TPL due to some specific limitations. Rx.NET does not have controllable concurrency/parallelism I'm looking for and is heavy. I don't know Akka.net but looks promising, thanks. – Wouter Van Ranst Jun 24 '21 at 10:11
  • Yeap, using a `Channel` would be also my personal preference, provided that I would not be overly concerned about performance. Using an asynchronous API synchronously usually comes with extra allocations, meaning more work for the garbage collector. On the contrary the built-in `BlockingCollection` is allocation-free AFAIK. – Theodor Zoulias Jun 24 '21 at 10:20
  • @WouterVanRanst I didn't suggest TPL, I suggested TPL Dataflow, which is a completely different thing. As for Rx, doesn't have "controllable concurrency", that is literally what `IScheduler` is for. – Aron Jun 25 '21 at 03:45

1 Answers1

1

Your implementation is not robust for general usage, but it may be good enough for an application that honors the following contract:

The collection must be consumed by exactly one consumer, who consumes it using exclusively the GetConsumingEnumerable method.

  1. If there is no consumer, the collection is empty, and the CompleteAdding method is invoked, the WaitCompleted task will never complete.
  2. If there are two or more consumers, the enumeration will fail with an InvalidOperationException for all but one consumers.
  3. If there is one consumer, but consumes the collection by using the Take or TryTake methods, the WaitCompleted task will never complete.

Without knowing your specific use case, I couldn't say whether you have a legitimate reason for requesting this functionality. In general though, waiting for the exact moment that a BlockingCollection<T> becomes empty and completed is usually unimportant. What is important is the exact moment that the processing of all consumed items is completed, which happens after the completion of the collection.


Note: this answer targets the Revision 1 version of this question.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks Theodor, I ammended the implementation in the OP as per your comments (taking into account 1 and 2 and explicitly marking 3 as not implemented. Wdyt? – Wouter Van Ranst Jun 24 '21 at 07:00
  • 1
    @WouterVanRanst regarding the [Revision 2](https://stackoverflow.com/revisions/68101714/2) version of the question, without being absolutely sure, I suspect that there are race conditions lurking there. Personally, and assuming that I needed this functionality (which is not a likely scenario), I wouldn't trust this implementation in a production environment. Properties like the [`Count`](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1.count) are not really intended for controlling the execution flow. – Theodor Zoulias Jun 24 '21 at 07:44
  • 1
    @WouterVanRanst also inheriting from the `BlockingCollection` can be a source of trouble. Passing an instance of the `BlockingCollectionEx` class in a method that has a `BlockingCollection` parameter, would be enough for all hell breaking loose. This method would use the `base.GetConsumingEnumerable` implementation instead of your class implementation. Polymorphism doesn't work with `new` members. It only works with `override` members. – Theodor Zoulias Jun 24 '21 at 07:45
  • 1
    That last argument really convinced me, thanks. – Wouter Van Ranst Jun 24 '21 at 10:11