0

Let's say I have objects:

ObjectA(
int ID,
List<ObjectB> items
)

ObjectB(
int ItemId,
string value
)

From RabbitMQ I fetch maximum of 10 messages, prefetchCount = 10, where after deserialization every message becomes object of type ObjectA.

One ObjectA can have multiple ObjectB with same ItemId.

All that is done in .NET Core 3.1 background worker:

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
            // code omitted for brevity
            // register event that handles message received
            consumer.Received += MessageReceived;
        }
    }

After deserialization for every ObjectA, I run two tasks, taskA and taskB, which is not currently important.

Important is TaskA that processes all items of ObjectA - List of ObjectB. taskA also runs multiple tasks for every ObjectB.

How can I ensure that across all tasks taskA, or all RMQ messages currently received, only one ObjectB with same ItemId can be processed at the time?

Example:

ObjectA (Id = 1, Items = {(1, "a"), (2, "b"), (1, "c")});

For given object, I must start parallel processing of Items (1, "a") and (2, "b") while (1, "c") should wait until Task that is processing (1, "a") has done processing because their ItemId is same.

leroy
  • 45
  • 1
  • 7
  • Take those objects, add to additional list/collection objects with the same Id, and then process them one-by-one. So objects with different Ids will be in different collections, and be processed in parallel to each other; but objects with the same Id will be processed in the order. – kosist Nov 13 '20 at 13:29
  • 1
    Related: [Keyed Lock in .Net](https://stackoverflow.com/questions/34754211/keyed-lock-in-net) and [Asynchronous locking based on a key](https://stackoverflow.com/questions/31138179/asynchronous-locking-based-on-a-key) – Theodor Zoulias Nov 13 '20 at 16:21

2 Answers2

0

If you use containers you need to register as a Singleton.

In the example, a lock is created for each key, and the new task will not be started while the previous one is running

public sealed class DelayTaskProcessor
{
    private readonly Dictionary<string, object> _syncDict = new Dictionary<string, object>();
    private readonly object _sync = new object();

    public Task Create(Action action, string id)
    {
        return new Task(() =>
        {

            object currentSync = null;
            lock (_sync)
            {
                if (_syncDict.ContainsKey(id))
                {
                    currentSync = _syncDict[id];
                }
                else
                {
                    currentSync = new object();
                    _syncDict.Add(id, currentSync);
                }
            }

            lock (currentSync)
            {
                Console.WriteLine($"Start: {id}");
                action?.Invoke();

            }
        });
    }
}
        public sealed class ObjectA
        {
            public int Id { get; set; }
            public List<ObjectB> Items { get; set; }
        }
        public sealed class ObjectB
        {
            public int Id { get; set; }
            public string Value { get; set; }
        }

    static void Main(string[] args)
    {
        var p = new DelayTaskProcessor();
        var objectA = new ObjectA
        {
            Id = 1, Items = new List<ObjectB>()
                {
                    new ObjectB {Id=1, Value="a"},
                    new ObjectB {Id=1, Value="a2"},
                    new ObjectB {Id=2, Value="b"},
                    new ObjectB {Id=3, Value="c"}
                }
            };
        var tasks = new List<Task>(objectA.Items.Count);
        foreach (var item in objectA.Items)
        {
            var task = p.Create(() =>
            {
                Console.WriteLine($"{item.Id},{item.Value}");
                Thread.Sleep(2000);

            }, $"b:{item.Id}");
            tasks.Add(task);
            task.Start();
        }

        Task.WaitAll(tasks.ToArray());
        var exceptions = new List<Exception>();
        foreach (var t in tasks)
        {
            if (t.IsFaulted)
            {
                exceptions.Add(t.Exception);
                
            }
            using (t) { }
        }
}
Stanislav
  • 459
  • 3
  • 6
  • Can you provide me with example of creating task? – leroy Nov 13 '20 at 14:15
  • I added to answer – Stanislav Nov 13 '20 at 14:44
  • Since I am quite struggling with this, I have few more questions. If I have 5 tasks to execute inside foreach, do I need to make p.Create... 5 times. Also, how can I know when all tasks for current item are finished? Can I make something like list of tasks, and then just use Task.WhenAll(listOfTasks)? – leroy Nov 13 '20 at 15:28
  • I added task list to answer and wait – Stanislav Nov 14 '20 at 17:42
0

I had to combine solution from HERE and HERE, and the code that works for me is:

    private static readonly ConcurrentDictionary<object, RefCounted<SemaphoreSlim>> _lockDict = new ConcurrentDictionary<object, RefCounted<SemaphoreSlim>>();

    private sealed class RefCounted<T>
    {
        public RefCounted(T value)
        {
            RefCount = 1;
            Value = value;
        }

        public int RefCount { get; set; }
        public T Value { get; private set; }
    }

    public DelayTaskProcessor()
    {
        
    }

    public async Task<bool> ManageConcurrency(object taskId, Func<Task> task)
    {
        await GetOrCreate(taskId).WaitAsync();

        try
        {
            await task();

            return true;
        }
        catch (Exception ex)
        {
            return false;
        }
        finally
        {
            RefCounted<SemaphoreSlim> item;
            lock (_lockDict)
            {
                item = _lockDict[taskId];
                --item.RefCount;
                if (item.RefCount == 0)
                    _lockDict.TryRemove(taskId, out var removed);
            }
            item.Value.Release();
        }
    }

    private SemaphoreSlim GetOrCreate(object key)
    {
        RefCounted<SemaphoreSlim> item;
        lock (_lockDict)
        {
            if (_lockDict.TryGetValue(key, out item))
            {
                ++item.RefCount;
            }
            else
            {
                item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
                _lockDict[key] = item;
            }
        }
        return item.Value;
    }
}

Pleas, feel free to comment on this.

leroy
  • 45
  • 1
  • 7