2

I have a Azure Service Bus queue where I'm receiving a range from 1 to 10 messages with the same "key". One of these messages needs to be processed with a long running operation. After it's complete, the database will be updated and the other messages will check it. However, in the mean time, the other messages will be re-queued so that the process isn't lost.

But the main point is that this long running operation CANNOT be ran at the same time for the same key, and shouldn't be ran multiple times.

This is what I've got so far:

void Main()
{
    Enumerable.Range(1, 1000)
              .AsParallel()
              .ForAll(async i => await ManageConcurrency(i % 2, async () => await Task.Delay(TimeSpan.FromSeconds(10)))); 
}

private readonly ConcurrentDictionary<int, SemaphoreSlim> _taskLocks = new ConcurrentDictionary<int, SemaphoreSlim>();

private async Task<bool> ManageConcurrency(int taskId, Func<Task> task)
{
    SemaphoreSlim taskLock = null;

    try
    {
        if (_taskLocks.TryGetValue(taskId, out taskLock))
        {
            if (taskLock.CurrentCount == 0)
            {
                Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I found. No available.. Thread Id: {Thread.CurrentThread.ManagedThreadId}");
                return false;
            }

            taskLock.Wait();

            Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I found and took. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
        }
        else
        {
            taskLock = new SemaphoreSlim(1, 1);
            taskLock = _taskLocks.GetOrAdd(taskId, taskLock);
            if (taskLock.CurrentCount == 0)
            {
                Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I didn't find, and then found/created. None available.. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
                return false;
            }
            else
            {
                taskLock.Wait(TimeSpan.FromSeconds(1));

                Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I didn't find, then found/created, and took. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
            }
        }

        Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, Lock pulled for TaskId {taskId}, Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");

        await task.Invoke();

        return true;
    }
    catch (Exception e)
    {
        ;
        return false;
    }
    finally
    {
        //taskLock?.Release();

        _taskLocks.TryRemove(taskId, out taskLock);

        //Console.WriteLine($"I removed. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
    }
}

It's not working as expected, because it will create multiple semaphores and suddenly my long running operation is being ran twice with the same key. I think the problem is because the whole operation isn't atomic.

What's the best way to solve this issue?

Cameron
  • 2,574
  • 22
  • 37
  • Related: [Asynchronous locking based on a key](https://stackoverflow.com/questions/31138179/asynchronous-locking-based-on-a-key) – Theodor Zoulias Dec 23 '20 at 20:45

3 Answers3

5

You correctly recognized that you need to ensure that only one semaphore is being created per key. The standard idiom for that is:

var dict = new ConcurrentDictionary<TKey, Lazy<SemaphoreSlim>>();
...
var sem = dict.GetOrAdd( , _ => new new Lazy<SemaphoreSlim>(() => SemaphoreSlim(1, 1))).Value;

Multiple lazies might be created but only one of them will ever be revealed and materialized.

Besides that it is a questionable practice to rely on in-memory state. What if your queue processing app recycles and all semaphores are lost? You better use a persistent store to track this locking info.

Jacques Bosch
  • 2,165
  • 3
  • 25
  • 36
usr
  • 168,620
  • 35
  • 240
  • 369
  • Thanks for your answer. I'll try it out and see how it works in my environment. And to answer your question, if my app recycles, then I'll have to restart the long running operation, from scratch – Cameron Jan 13 '16 at 14:47
2

You're nearly there... do you need the incoming order preserved? If not:

public static void Main(string[] args)
{
    Enumerable.Range(1, 1000)
                .AsParallel()
                .ForAll( i => ManageConcurrency(i % 2,  () => Task.Delay(TimeSpan.FromSeconds(10))).Wait());


}

private static readonly ConcurrentDictionary<int, SemaphoreSlim> _lockDict = new ConcurrentDictionary<int, SemaphoreSlim>();

private static async Task<bool> ManageConcurrency(int taskId, Func<Task> task)
{

    var gate = _lockDict.GetOrAdd(taskId, _ => new SemaphoreSlim(1, 1));
    await gate.WaitAsync();

    try
    {

        Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, Lock pulled for TaskId {taskId}, Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");

        await task();

        return true;
    }
    catch (Exception e)
    {
        return false;
    }
    finally
    {
        gate.Release();
    }

}
jamespconnor
  • 1,382
  • 14
  • 29
2

It seems to me that you're making your life harder with worrying about semaphores and the like. There are easier abstractions to use.

Using Lazy<T> is ideal in this situation, but since you want to await the results then Lazy<T> needs an upgrade to AsyncLazy<T>.

public class AsyncLazy<T> : Lazy<Task<T>>
{
    public AsyncLazy(Func<T> valueFactory) :
        base(() => Task.Factory.StartNew(valueFactory))
    { }

    public AsyncLazy(Func<T> valueFactory, LazyThreadSafetyMode mode) :
        base(() => Task.Factory.StartNew(valueFactory), mode)
    { }

    public AsyncLazy(Func<Task<T>> taskFactory) :
        base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap())
    { }

    public AsyncLazy(Func<Task<T>> taskFactory, LazyThreadSafetyMode mode) :
        base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode)
    { }

    public TaskAwaiter<T> GetAwaiter() { return Value.GetAwaiter(); }
}

I've created a class to simulate the result of the long running task:

public class LongRunningResult
{
    public int Index;
}

And the method that needs to get run to do the computation:

private LongRunningResult ComputeLongRunningResult(int index)
{
    Console.WriteLine($"Running Index {index}");
    Thread.Sleep(1000);
    return new LongRunningResult() { Index = index };
}

Now we need the dictionary to hold the lazy asyncs:

private readonly ConcurrentDictionary<int, AsyncLazy<LongRunningResult>> _results
    = new ConcurrentDictionary<int, AsyncLazy<LongRunningResult>>();

Now it becomes super easy:

Enumerable
    .Range(1, 10)
    .AsParallel()
    .ForAll(async i =>
    {
        var index = i % 2;
        Console.WriteLine($"Trying Index {index}");
        _results.TryAdd(index,
            new AsyncLazy<LongRunningResult>(
                () => ComputeLongRunningResult(index),
                LazyThreadSafetyMode.ExecutionAndPublication));
        AsyncLazy<LongRunningResult> ayncLazy;
        if (_results.TryGetValue(index, out ayncLazy))
        {
            await ayncLazy;
        }
    });

The output I get from this is like:

Trying Index 1
Trying Index 0
Trying Index 1
Trying Index 1
Trying Index 0
Trying Index 1
Running Index 1
Trying Index 0
Trying Index 1
Running Index 0
Trying Index 0
Trying Index 0
Enigmativity
  • 113,464
  • 11
  • 89
  • 172