I thought this was an interesting question so I spent a bit of time on it.
The scenario I understand it is this:
- You have a BlockingCollection that is full
- A number of threads start, each trying to add to the BlockingCollection. These calls will all block; that is why they need to occur in parallel.
- As space becomes available, the Add calls will become unblocked.
- The calls to
Add
need to complete in the order they were received.
First of all, let's talk about code structure. Instead of using a BlockingCollection and writing procedural code around it, I suggest extending the BlockingCollection and replacing the Add method with the functionality you need. It may look something like this:
public class QueuedBlockingCollection<T> : BlockingCollection<T>
{
private FifoMonitor monitor = new FifoMonitor();
public QueuedBlockingCollection(int max) : base (max) {}
public void Enqueue(T item)
{
using (monitor.Lock())
{
base.Add(item);
}
}
}
Here, the trick is the use of a FifoMonitor
class, which will give you the functionality of a lock
but will enforce order. Unfortunately, no class like that exists in the CLR. But we can write one:
public class FifoMonitor
{
public class FifoCriticalSection : IDisposable
{
private readonly FifoMonitor _parent;
public FifoCriticalSection(FifoMonitor parent)
{
_parent = parent;
_parent.Enter();
}
public void Dispose()
{
_parent.Exit();
}
}
private object _innerLock = new object();
private volatile int counter = 0;
private volatile int current = 1;
public FifoCriticalSection Lock()
{
return new FifoCriticalSection(this);
}
private void Enter()
{
int mine = Interlocked.Increment(ref counter);
Monitor.Enter(_innerLock);
while (current != mine) Monitor.Wait(_innerLock);
}
private void Exit()
{
Interlocked.Increment(ref current);
Monitor.PulseAll(_innerLock);
Monitor.Exit(_innerLock);
}
}
Now to test. Here's my program:
public class Program
{
public static void Main()
{
//Setup
var blockingCollection = new QueuedBlockingCollection<int>(10);
var tasks = new Task[10];
//Block the collection by filling it up
for (int i=1; i<=10; i++) blockingCollection.Add(99);
//Start 10 threads all trying to add another value
for (int i=1; i<=10; i++)
{
int index = i; //unclose
tasks[index-1] = Task.Run( () => blockingCollection.Enqueue(index) );
Task.Delay(100).Wait(); //Wait long enough for the Enqueue call to block
}
//Purge the collection, making room for more values
while (blockingCollection.Count > 0)
{
var n = blockingCollection.Take();
Console.WriteLine(n);
}
//Wait for our pending adds to complete
Task.WaitAll(tasks);
//Display the collection in the order read
while (blockingCollection.Count > 0)
{
var n = blockingCollection.Take();
Console.WriteLine(n);
}
}
}
Output:
99
99
99
99
99
99
99
99
99
99
1
2
3
4
5
6
7
8
9
10
Looks like it works! But just to be sure, I changed Enqueue
back to Add
, to ensure that the solution actually does something. Sure enough, it ends up out of order with the regular Add
.
99
99
99
99
99
99
99
99
99
99
2
3
4
6
1
5
7
8
9
10
Check out the code on DotNetFiddle