Here is a class PrioritySemaphore<TPriority>
that can be acquired with priority. Internally it is based on the PriorityQueue<TElement, TPriority>
collection (.NET 6).
public class PrioritySemaphore<TPriority>
{
private readonly PriorityQueue<TaskCompletionSource, (TPriority, long)> _queue;
private readonly int _maxCount;
private int _currentCount;
private long _indexSeed = 0;
public PrioritySemaphore(int initialCount, int maxCount,
IComparer<TPriority> comparer = null)
{
if (initialCount < 0)
throw new ArgumentOutOfRangeException(nameof(initialCount));
if (maxCount <= 0)
throw new ArgumentOutOfRangeException(nameof(maxCount));
comparer ??= Comparer<TPriority>.Default;
_queue = new(Comparer<(TPriority, long)>.Create((x, y) =>
{
int result = comparer.Compare(x.Item1, y.Item1);
if (result == 0) result = x.Item2.CompareTo(y.Item2);
return result;
}));
_currentCount = initialCount;
_maxCount = maxCount;
}
public PrioritySemaphore(int initialCount, IComparer<TPriority> comparer = null)
: this(initialCount, Int32.MaxValue, comparer) { }
public PrioritySemaphore(IComparer<TPriority> comparer = null)
: this(0, Int32.MaxValue, comparer) { }
public int CurrentCount => Volatile.Read(ref _currentCount);
public Task WaitAsync(TPriority priority)
{
lock (_queue)
{
Debug.Assert((_queue.Count == 0) || (_currentCount == 0));
if (_currentCount > 0)
{
_currentCount--;
return Task.CompletedTask;
}
TaskCompletionSource tcs = new(
TaskCreationOptions.RunContinuationsAsynchronously);
_queue.Enqueue(tcs, (priority, ++_indexSeed));
return tcs.Task;
}
}
public void Release()
{
TaskCompletionSource tcs;
lock (_queue)
{
Debug.Assert((_queue.Count == 0) || (_currentCount == 0));
if (_queue.Count == 0)
{
if (_currentCount >= _maxCount) throw new SemaphoreFullException();
_currentCount++;
return;
}
tcs = _queue.Dequeue();
}
tcs.TrySetResult();
}
}
Usage example:
PrioritySemaphore<int> semaphore = new();
//...
await semaphore.WaitAsync(priority: 1);
//...
await semaphore.WaitAsync(priority: 2);
//...
semaphore.Release();
After the Release
, the semaphore will be acquired by the awaiter with the highest priority. In the above example it will be the awaiter with priority 1
. Smaller values denote higher priority. If there are more than one awaiters with the same highest priority, the semaphore will be acquired by the one that requested it first. Maintaining FIFO order is the reason for coupling the TPriority
with a long
in the above implementation.
The class PrioritySemaphore<TPriority>
has only asynchronous API, and it doesn't support awaiting with cancellation or timeout. For a version that has more features and also compiles on .NET versions earlier than 6, see the 5th revision of this answer (based on the more flexible but the less efficient SortedSet
).