I have a Azure Service Bus queue where I'm receiving a range from 1 to 10 messages with the same "key". One of these messages needs to be processed with a long running operation. After it's complete, the database will be updated and the other messages will check it. However, in the mean time, the other messages will be re-queued so that the process isn't lost.
But the main point is that this long running operation CANNOT be ran at the same time for the same key, and shouldn't be ran multiple times.
This is what I've got so far:
void Main()
{
Enumerable.Range(1, 1000)
.AsParallel()
.ForAll(async i => await ManageConcurrency(i % 2, async () => await Task.Delay(TimeSpan.FromSeconds(10))));
}
private readonly ConcurrentDictionary<int, SemaphoreSlim> _taskLocks = new ConcurrentDictionary<int, SemaphoreSlim>();
private async Task<bool> ManageConcurrency(int taskId, Func<Task> task)
{
SemaphoreSlim taskLock = null;
try
{
if (_taskLocks.TryGetValue(taskId, out taskLock))
{
if (taskLock.CurrentCount == 0)
{
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, I found. No available.. Thread Id: {Thread.CurrentThread.ManagedThreadId}");
return false;
}
taskLock.Wait();
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, I found and took. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
}
else
{
taskLock = new SemaphoreSlim(1, 1);
taskLock = _taskLocks.GetOrAdd(taskId, taskLock);
if (taskLock.CurrentCount == 0)
{
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, I didn't find, and then found/created. None available.. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
return false;
}
else
{
taskLock.Wait(TimeSpan.FromSeconds(1));
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, I didn't find, then found/created, and took. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
}
}
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, Lock pulled for TaskId {taskId}, Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
await task.Invoke();
return true;
}
catch (Exception e)
{
;
return false;
}
finally
{
//taskLock?.Release();
_taskLocks.TryRemove(taskId, out taskLock);
//Console.WriteLine($"I removed. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
}
}
It's not working as expected, because it will create multiple semaphores and suddenly my long running operation is being ran twice with the same key. I think the problem is because the whole operation isn't atomic.
What's the best way to solve this issue?