0

Following is my problem.

I have an API controller with an API endpoint inside of it (the resource).

 api/myResource/{id}/do-something

I'm working on a middleware, that will limit access to this resource based upon some business rules. Inside this middleware, I'm matching the incoming request, I'm parsing the URI and I want to allow access to it (let the pipeline flow) or simply return with a 412 status code in case the limit of allowed threads is reached FOR THE GIVEN RESOURCE (e.g)

 api/myResource/1/do-something /// should allow 2 concurrent accesses.
 api/myResource/2/do-something /// should allow 10 concurrent accesses.
 api/myResource/3/do-something /// should allow 1 concurrent accesses.

For that I've started implementing a solution which I will attach.

internal class AsyncLock<TKey>
{
    private static readonly ConcurrentDictionary<TKey, SemaphoreSlim> _safeSemaphores
        = new ConcurrentDictionary<TKey, SemaphoreSlim>();

    internal async Task<SemaphoreSlim> TryLockAsync(TKey key, int maxConcurrentCount)
    {
        if (!_safeSemaphores.TryGetValue(key, out SemaphoreSlim semaphore))
        {
            semaphore = new SemaphoreSlim(maxConcurrentCount, maxConcurrentCount);

            _safeSemaphores.TryAdd(key, semaphore);
        }

        await semaphore.WaitAsync();

        return semaphore;
    }

    internal SemaphoreSlim TryLock(TKey key, int maxConcurrentCount)
    {
        if (!_safeSemaphores.TryGetValue(key, out SemaphoreSlim semaphore))
        {
            semaphore = new SemaphoreSlim(maxConcurrentCount, maxConcurrentCount);

            _safeSemaphores.TryAdd(key, semaphore);
        }

        semaphore.Wait();

        return semaphore;
    }
}

This is how it is used (it allows for 2 concurrent accesses, of course this is the subject of this question, it is not going to be a hardcoded 2 but determined earlier in the pipeline)

AsyncLock<string> _lock = new AsyncLock<string>();
SemaphoreSlim semaphore = await _lock .TryLockAsync(key, 2);

if (semaphore.CurrentCount != 0)
{
    context.Response.StatusCode = 200;
    //await _next(context);
    semaphore.Release();
}
else
{
    context.Response.StatusCode = 412;
}

How it behaves is unpredictable. I'm testing using 4 threads, sometimes it works as expected, sometimes they all return 200, other times they all get stuck, I mean it is a combination every time.

I would really appreciate some help figuring this one out.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • hi, interesting, perhaps use [`Interlocked.CompareExchange`](http://www.russbishop.net/interlocked-compareexchange) - skip if the value was already updated. – IronMan Mar 01 '21 at 23:26
  • 1
    Do you know that using async code in a lock can lead to non-responsive applications? Async code is mostly used for IO operations. (files/network/etc) What if the asynchronous code tries to download a large file? Locks causes congestions, so make sure they are as short as possible. I advise to review the design and place the async code outside the locks. – Jeroen van Langen Mar 01 '21 at 23:49
  • Is there any possibility that in order to respond to a single API call, more than one semaphores should be acquired? Or is it guaranteed that every `api/myResource/X/do-something` will acquire just a single semaphore? In the second case, you probably need a `KeyedSemaphore`. You could check out this question as a starting point: [Asynchronous locking based on a key](https://stackoverflow.com/questions/31138179/asynchronous-locking-based-on-a-key) – Theodor Zoulias Mar 02 '21 at 18:12
  • @TheodorZoulias, I'm aware about this implementation, actually this is the point from where I've started. Your assumption, that `api/myResource/X/do-something` will be aquired by a single semaphore is correct. –  Mar 02 '21 at 18:49
  • There are some solid implementations posted there. For example you could take [Stephen Cleary's solution](https://stackoverflow.com/questions/31138179/asynchronous-locking-based-on-a-key/31194647#31194647), and change the `GetOrCreate(object key)` to `GetOrCreate(object key, int maximumConcurrency)`, and use the argument in the line `item = new RefCounted(new SemaphoreSlim(1, 1));`, replacing the `1`s. – Theodor Zoulias Mar 02 '21 at 22:12
  • I've already tried that, replacing the 1s will indeed allow multiple threads to access the resource, but the others will have to wait till one of the threads that are allowed will invoke .Release(). I'm interested in a way to not keep the waiting threads but allow them the simply skip the protected part. –  Mar 02 '21 at 23:12
  • In that case you don't need an asynchronous wait. You can just call the synchronous [`Wait`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphoreslim.wait) method of the `SemaphoreSlim` that accepts a timeout, and pass `TimeSpan.Zero` as the timeout. The result will be `true` in case the semaphore was acquired, and `false` if it was not acquired. You need to release the semaphore only in the first case. Releasing it in the second case would be a bug. You could try modifying [my implementation](https://stackoverflow.com/a/65256155/11178549), that already has this overload. – Theodor Zoulias Mar 03 '21 at 00:24
  • In retrospect you can't use my implementation because it maintains a pool of reusable `SemaphoreSlim`s. In your case each `SemaphoreSlim` has a different capacity, so it's not interchangeable. You would need to strip out the pooling functionality, and honestly you probably don't need to remove the currently idle `SemaphoreSlim`s from the dictionary either. A bare bone implementation suitable for your case may need no more than 10-15 lines of code. – Theodor Zoulias Mar 03 '21 at 00:39
  • One gotcha you may need to be aware of is that an application hosted on IIS may be recycled automatically by the host. During the transition I think it's possible that two instances of the application may be running concurrently for a brief time span, and this could invalidate your max-concurrency policy. In case this is an actual issue, you could consider using [named](https://learn.microsoft.com/en-us/dotnet/standard/threading/semaphore-and-semaphoreslim#named-semaphores) `Semaphore` instances instead of `SemaphoreSlims`s. – Theodor Zoulias Mar 03 '21 at 00:47

3 Answers3

3

Seems simplest to use Monitor.TryEnter.

object _lock = new object();


void RunIfNotLocked()
{
    bool lockAcquired = false;

    Monitor.TryEnter(_lock, ref lockAcquired);
    if ( !lockAcquired ) 
    {
        //Skip
        return;
    }
    try
    {
        DoSomething();
    }
    finally
    {         
        Monitor.Exit(_lock);
    }
}
John Wu
  • 50,556
  • 8
  • 44
  • 80
  • or `Spinlock.TryEnter`... *If the lock is not available when TryEnter is called, it will return immediately without any further spinning.* – tymtam Mar 02 '21 at 00:12
  • this implementation will limit access to a single resource, while I need to be able to allow access to same resource for N threads. –  Mar 02 '21 at 13:48
  • "Limit access to a single resource" and "allow access to the same resource for N threads" are not opposites. Are you saying there is more than one resource? Or are you saying that more than one thread must be allowed access (i.e. not skip) to the single resource at the same time? – John Wu Mar 02 '21 at 16:01
  • I'm saing that thare are multiple resources, each resource may have a different strategy of allowing access to threads. One resource may be accessed by 1 thread, another may be accessible by 2 threads. Imagine the following URI api/myApi/{id}/do-something. the Id is dynamic, based upon the given ID, I want that for an id of 1 to allow 2 threads access, for ids 2 and 3 only one thread. –  Mar 02 '21 at 16:13
  • 1
    That strikes me as significantly different from your original question. I'd invite you to edit and update your post, but it appears it has been closed as a duplicate. – John Wu Mar 02 '21 at 16:44
  • yes, I was wondering why because the issue is not that I don't know what to use lock or Monitor. will edit,. thanks –  Mar 02 '21 at 16:51
  • 2
    Wow, now your question is *totally* different. Now there is a client and a server. For this problem, you should not use `lock` or `monitor` or `semaphore` or anything of that kind at all, because those are in-memory and won't scale, unless you are using some kind of session stickiness. Your question is more about a custom rate limiting scheme rather than thread locks. – John Wu Mar 02 '21 at 18:01
  • If I think about it, now that you've made the point, yes this will not scale at all. –  Mar 02 '21 at 18:56
1

I've came out with this solution, It seems to work as expected.

internal sealed class AsyncLock<TKey>
{
    public readonly Dictionary<TKey, SemaphoreSlim> _semaphores = new Dictionary<TKey, SemaphoreSlim>();

    internal IDisposable Lock(TKey key, int maxDegreeOfParallelism)
    {
        bool acquired = GetOrAdd(key, maxDegreeOfParallelism).Wait(0);

        return acquired
            ? new Releaser(key, this)
            : null;
    }

    internal async Task<IDisposable> LockAsync(TKey key, int maxDegreeOfParallelism = 1)
    {
        bool acquired = await GetOrAdd(key, maxDegreeOfParallelism).WaitAsync(0);

        return acquired
            ? new Releaser(key, this)
            : null;
    }

    private SemaphoreSlim GetOrAdd(TKey key, int maxConcurrencyCount = 1)
    {
        lock (_semaphores)
        {
            if (!_semaphores.TryGetValue(key, out SemaphoreSlim semaphore))
            {
                _semaphores[key] = semaphore = new SemaphoreSlim(maxConcurrencyCount, maxConcurrencyCount);
            }

            return semaphore;
        }
    }

    private sealed class Releaser : IDisposable
    {
        private AsyncLock<TKey> _parent;

        public void Dispose()
        {
            lock (_parent._semaphores)
            {
                if (_parent._semaphores.TryGetValue(Key, out SemaphoreSlim semaphore)) semaphore.Release();
            }
        }

        public TKey Key { get; }

        public Releaser(TKey key, AsyncLock<TKey> parent)
        {
            Key = key;
            _parent = parent;
        }
    }
}

To use it:

AsyncLock<string> _asyncLock = new AsyncLock<string>();

IDisposable disposable = _asyncLock.Lock(key, maxConcurrency: 2);

if (disposable is null)
{
    /// thread skipped
}
else
{
    ///  thread entered
    disposable.Dispose();
}
  • Nice! You should probably pay attention to the return value of the `TryGetValue` method inside the `Releaser.Dispose`. This should never be `false` according to your current implementation, but I would do the check anyway to be on the safe side. Also the `_semaphores` being a `static` field is probably a design flaw. It prevents you from creating two independent instances of the `AsyncLock` class. – Theodor Zoulias Mar 03 '21 at 12:24
  • 1
    Thank you for the input @TheodorZoulias, I will look into adapting it. –  Mar 03 '21 at 12:37
  • 1
    Now it's perfect! One small nitpick: the term "degree of parallelism" is generally suitable for code that is running parallely on multiple CPU cores. For concurrent asynchronous operations, that are typically [not running anywhere](https://blog.stephencleary.com/2013/11/there-is-no-thread.html), a more suitable term is "level of concurrency" or "maximum concurrency". – Theodor Zoulias Mar 03 '21 at 19:55
0

Here is a KeyedNamedSemaphore class that you could use. It stores one named Semaphore per key. After the key is created, the associated semaphore remains in the dictionary until the KeyedNamedSemaphore instance is disposed.

public class KeyedNamedSemaphore<TKey> : IDisposable
{
    private readonly ConcurrentDictionary<TKey, Semaphore> _perKey;
    private readonly string _prefix;

    public KeyedNamedSemaphore(string prefix = null,
        IEqualityComparer<TKey> keyComparer = null)
    {
        _perKey = new ConcurrentDictionary<TKey, Semaphore>(keyComparer);
        _prefix = prefix ?? $@"Global\{System.Reflection.Assembly
            .GetExecutingAssembly().GetName().Name}-";
    }

    public bool TryLock(TKey key, int maximumConcurrency, out Semaphore semaphore)
    {
        if (!_perKey.TryGetValue(key, out semaphore))
        {
            var newSemaphore = new Semaphore(maximumConcurrency, maximumConcurrency,
                $"{_prefix}{key}");
            semaphore = _perKey.GetOrAdd(key, newSemaphore);
            if (semaphore != newSemaphore) newSemaphore.Dispose();
        }
        var acquired = semaphore.WaitOne(0);
        if (!acquired) semaphore = null;
        return acquired;
    }

    public void Dispose()
    {
        foreach (var key in _perKey.Keys)
            if (_perKey.TryRemove(key, out var semaphore)) semaphore.Dispose();
    }
}

Usage example:

var locker = new KeyedNamedSemaphore<string>();
if (locker.TryLock("api/myResource/1/do-something", 10, out var semaphore))
{
    try
    {
        await DoSomethingAsync();
    }
    finally { semaphore.Release(); }
}
else
{
    await DoSomethingElseAsync();
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104