A simple solution might be to store the workers in a Queue<T>
instead of a List<T>
, dequeue a worker every time you need one, and enqueue it back immediately:
Queue<Worker> _workers = new();
for (int i = 0; i < workersCount; i++) _workers.Enqueue(new());
ParallelOptions options = new() { MaxDegreeOfParallelism = 10 };
await Parallel.ForEachAsync(workItems, options, async (workItem, ct) =>
{
Worker worker;
lock (_workers)
{
worker = _workers.Dequeue();
_workers.Enqueue(worker);
}
await worker.DoWork(workItem);
});
This way the workers will be used in a round robin fashion, as an unlimited resource. The MaxConcurrencyPerWorker
policy will not be enforced.
If you want to enforce this policy, then you must use them as a limited resource, so enqueue them back in the queue only after the completion of the DoWork
operation. You must also enqueue each Worker
multiple times in the queue (MaxConcurrencyPerWorker
times), in an interleaving manner. You must also deal with the case that the pool of workers has been exhausted, in which case the execution flow will have to be suspended until a worker becomes available. A Queue<T>
doesn't offer this functionality. You will need a Channel<T>
:
Channel<Worker> workerPool = Channel.CreateUnbounded<Worker>();
for (int i = 0; i < MaxConcurrencyPerWorker; i++)
foreach (Worker worker in _workers)
workerPool.Writer.TryWrite(worker);
ParallelOptions options = new() { MaxDegreeOfParallelism = workerPool.Reader.Count };
await Parallel.ForEachAsync(workItems, options, async (workItem, ct) =>
{
Worker worker = await workerPool.Reader.ReadAsync();
try
{
await worker.DoWork(workItem);
}
finally { workerPool.Writer.TryWrite(worker); }
});
The Channel<T>
is an asynchronous version of the BlockingCollection<T>
. The ChannelReader.ReadAsync
method returns a worker synchronously if one is stored currently in the channel, or asynchronously if the channel is currently empty. In the above example the ReadAsync
will always return synchronously a worker, because the degree of parallelism of the Parallel.ForEachAsync
loop has been limited to the number of the total (not distinct) available workers.
Update: The above solution does not guarantee perfect balancing in the long-run. It's not impossible for the workerPool
to lose gradually its "interleaving" property, resulting in many references of the same Worker
being stored consecutively the one after the other. For precise control it might be required to keep track of the usage statistics of each individual worker. You would need some structure resembling a LRU cache to hold the workers and the statistics. Something like an ObjectPool<T>
with priority management. Here is what I came up with. A PriorityPool<T>
class that is backed up by a simple array (instead of something more complex like a dictionary or a sorted set or a priority queue), which is also equipped with a SemaphoreSlim
in order to enforce the MaxConcurrencyPerWorker
policy.
public class PriorityPool<T> : IDisposable
{
private struct Entry
{
public T Item;
public int ConcurrencyCount;
public long LastUseStamp;
}
private readonly Entry[] _items;
private readonly IEqualityComparer<T> _comparer;
private readonly SemaphoreSlim _semaphore;
private long _lastUseStamp;
public int Count { get { return _items.Length; } }
public PriorityPool(IEnumerable<T> items, int maxConcurrencyPerItem,
IEqualityComparer<T> comparer = default)
{
ArgumentNullException.ThrowIfNull(items);
if (maxConcurrencyPerItem < 1)
throw new ArgumentOutOfRangeException(nameof(maxConcurrencyPerItem));
_items = items.Select(x => new Entry() { Item = x }).ToArray();
_comparer = comparer ?? EqualityComparer<T>.Default;
if (_items.Length == 0)
throw new ArgumentException("No items found.", nameof(items));
if (_items.DistinctBy(e => e.Item, _comparer).Count() != _items.Length)
throw new ArgumentException("Duplicate item found.", nameof(items));
int semaphoreSize = _items.Length * maxConcurrencyPerItem;
_semaphore = new(semaphoreSize, semaphoreSize);
}
public async ValueTask<T> GetAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
lock (_items)
{
int index = 0;
for (int i = 1; i < _items.Length; i++)
{
int diff = _items[i].ConcurrencyCount - _items[index].ConcurrencyCount;
if (diff > 0) continue;
if (diff < 0 || _items[i].LastUseStamp < _items[index].LastUseStamp)
index = i;
}
_items[index].ConcurrencyCount++;
_items[index].LastUseStamp = ++_lastUseStamp;
return _items[index].Item;
}
}
public void Return(T item)
{
lock (_items)
{
int index;
for (index = 0; index < _items.Length; index++)
if (_comparer.Equals(item, _items[index].Item)) break;
if (index == _items.Length)
throw new InvalidOperationException("Item not found.");
if (_items[index].ConcurrencyCount == 0)
throw new InvalidOperationException("Negative concurrency.");
_items[index].ConcurrencyCount--;
}
_semaphore.Release();
}
public void Dispose() => _semaphore.Dispose();
}
Usage example:
using PriorityPool<Worker> workerPool = new(_workers, MaxConcurrencyPerWorker);
//...
Worker worker = await workerPool.GetAsync();
try
{
await worker.DoWork(workItem);
}
finally { workerPool.Return(worker); }
The GetAsync
method returns the worker with the least concurrency level at the moment. In case of a tie, it returns the least recently used worker.
The PriorityPool<T>
class is thread-safe, with the exception of Dispose
, which must be used only when all other operations on the PriorityPool<T>
have completed (behavior inherited from the SemaphoreSlim
).