For a multithreaded application I want to await
until a BlockingCollection
is completed and empty (IsCompleted = true). I implemented the below and this seems to be working.
Since it's multithreading I don't even trust my own shadow. Would this be a robust implementation?
public class BlockingCollectionEx<T> : BlockingCollection<T>
{
public Task WaitCompleted => completedManualResetEvent.Task;
private readonly TaskCompletionSource completedManualResetEvent = new();
public new void CompleteAdding()
{
base.CompleteAdding();
lock (completedManualResetEvent)
{
if (base.Count == 0 && !completedManualResetEvent.Task.IsCompleted)
completedManualResetEvent.SetResult();
}
}
public new IEnumerable<T> GetConsumingEnumerable()
{
foreach (var item in base.GetConsumingEnumerable())
yield return item;
lock (completedManualResetEvent) //if GetConsumingEnumerable is used by multiple threads, the 2nd one would throw an InvalidOperationException
{
if (!completedManualResetEvent.Task.IsCompleted)
completedManualResetEvent.SetResult();
}
}
public new IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) => throw new NotImplementedException();
public new T Take() => throw new NotImplementedException();
public new T Take(CancellationToken cancellationToken) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout) => throw new NotImplementedException();
}
usage:
var x = new BlockingCollectionEx<int> { 1, 2, 3 };
x.CompleteAdding();
Task.Run(() =>
{
foreach (var item in x.GetConsumingEnumerable())
// do stuff in Task 1
});
Task.Run(() =>
{
foreach (var item in x.GetConsumingEnumerable())
// do stuff in Task 2
});
await x.WaitCompleted;
Debug.Assert(x.IsCompleted);
// do stuff since the collection is emtpy