0

I have a concurrent dictionary that is used to manage subscriptions (and interlink with other code), for the sake of this example a simplified version is shown.

I need to update an external message bus when topics are added or removed, instead of adding an additional synchronization mechanism and wrapping a blocking section around the dictionary add and message topic add subscription, I am using the concurrent dictionary and calling back an Action when the item is successfully added:

class SubscriptionManager
{
    readonly ConcurrentDictionary<string, SubscribedTypes> subscriptions = new();

    public void AddDynamic(string topic, Type type, Action<string> dynamicCallback)
    {
        subscriptions.AddOrUpdate(
            topic,
            (topic) =>
            {
                var subscribedTypes = new SubscribedTypes(qos);
                /* do some work and checks */

                dynamicCallback(topic); // call back the calling code

                return subscribedTypes;
            },
            (topic, subscribedTypes) =>
            {
                /* snip */
                subscribedTypes.Add(type);
                return subscribedTypes;
            }
        );
    }
}

My message bus is purely asynchronous, and the client would call as below:

subscriptionManager.AddDynamic("v1/health", HealthMessage,
    async (topic) => await messageBus.Subscribe(topic));

UPDATES TO ORIGINAL QUESTION

Is the anonymous lambda even called as a async function call? ANSWER - this is not a good idea. The message bus could be disposed, so either assure through IoC lifetime management of singleton or implement SubscriptionManager instance inside the message bus

Performance is critical here as on review some apps will create lots of subscriptions dynamically, and the dictionary will be read continuously.

I have therefore put in place a ReaderWriterLockSlim lock to minimize locks on reads (with escalation to full lock on write) and when subscription to the message bus is required it is done outside of the critical section.

public class SubscriptionManager
{
    // SubscribedTypes is a dictionary with some additional features
    private readonly Dictionary<string, SubscribedTypes> dict = new();
    private readonly ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
    private readonly IMessageBus messageBus;
    
    SubscriptionManager(IMessageBus messageBus)
    {
        this.messageBus = messageBus;
    }

    public async Task AddDynamicAsync(string topic, Type type)
    {
        var subscriptionRequired = false;
        
        WriteLock(() => {
            if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
            {
                // just add - no need to subscribe, is already subscribed when
                // created for first time
                value.DynamicRegistrations.Add(handlerType);                
            }
            else
            {
                // this topic is new, add it
                var subscribedTypes = new SubscribedTypes(qos);
                subscribedTypes.DynamicRegistrations.Add(handlerType);
                this.dict[topic] = subscribedTypes;

                // we will need to register this on external bus
                subscriptionRequired = true;
            }           
        });
        
        if(subscriptionRequired)
            await this.messageBus.Subscribe(topic);
    }
    
    public async Task RemoveDynamicAsync(string topic, Type type)
    {
        var unsubscribeRequired = false;
    
        WriteLock(() => {
            if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
            {
                value.DynamicRegistrations.Remove(handlerType);             
                unsubscribeRequired = true;
            }
            else
            {
                throw new InvalidOperationException(
                    $"topic '{topic}' not registered.");
            }       
        });
        
        if(unsubscribeRequired)
            await this.messageBus.Unsubscribe(topic);
    }
    
    public IEnumerable<Type> GetTypesForTopic(string topic)
    {
        var found = ReadLock(() => {
            if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
                return value.GetAll();  
            else
                return new List<Type>();
        });
    }

    private SubscribedTypes ReadLock(Func<SubscribedTypes> func)
    {
        rwLock.EnterReadLock();
        try { return func(); }
        finally { rwLock.ExitReadLock(); }
    }

    private void WriteLock(Action action)
    {
        rwLock.EnterWriteLock();
        try { action(); }
        finally { rwLock.ExitWriteLock(); }
    }               
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
morleyc
  • 2,169
  • 10
  • 48
  • 108
  • 2
    First of all, that's not what those parameters are for: https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.addorupdate?view=netcore-3.1. Secondly, while you won't see a deadlock, you might see much worse problems since that's effectively a fire-and-forget action (and your `messageBus` might be disposed in the middle of the way, or even before the task starts, really, anything can happen there) – Camilo Terevinto Aug 07 '20 at 18:53
  • Thanks Camilo, i am sure the factories for add/update are OK `public TValue AddOrUpdate (TKey key, Func addValueFactory, Func updateValueFactory);` - they are working as expected (adding values). Pointed noted on the fire and forget, any recommendations that i could do to restructure this (even if total write of a self contained class that wraps the message bus and dictionary)? The messageBus in this case is a full managed wrapper, so will never be disposed it handles everything internally (including subscriptions and starting). – morleyc Aug 07 '20 at 19:09
  • If you don't want locks, which does make sense to me, could you use a simple if with the returning boolean of `public boolean TryAdd`? It would make you call `TryAdd` and `TryUpdate` though. If performance isn't critical, a simple lock is the simplest approach IMO – Camilo Terevinto Aug 07 '20 at 19:53

1 Answers1

2

Your first approach with ConcurrentDictionary is problematic, because according to the documentation:

If you call AddOrUpdate simultaneously on different threads, addValueFactory may be called multiple times, but its key/value pair might not be added to the dictionary for every call.

[...] The addValueFactory and updateValueFactory delegates are called outside the locks to avoid the problems that can arise from executing unknown code under a lock. Therefore, AddOrUpdate is not atomic with regards to all other operations on the ConcurrentDictionary<TKey,TValue> class.

The way you use the ConcurrentDictionary indicates that you expect that the supplied lambdas will be executed at most once for each call to AddOrUpdate, and this is not guaranteed.

Your second approach with a normal Dictionary+lock will not compile, because you are not allowed to await inside a lock block. You must either move the await messageBus.Subscribe(topic); outside of the protected region, or use an asynchronous throttler like the SemaphoreSlim.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks for the reply, I have proposed as an update on the question a rewrite using `ReaderWriterLockSlim` and then performing the slow message bus subscription outside of the loop (was thinking of queueing these subscribe/unsubscribe requests on background worker thread but given async see no benefit) - do you feel this is a better fit (feel free to copy/paste the code into the answer and tweak as required i am not sure is correct) thanks for the help – morleyc Aug 08 '20 at 14:18
  • @morleyc sorry, your code is quite involved, and I have not much time to delve into it. I'll just throw as an idea to consider using immutable collections as containers for your subscriptions. These collections need very little synchronization. You can see an example of using them [here](https://stackoverflow.com/questions/61540896/factory-for-iasyncenumerable-or-iasyncenumerator/62098158#62098158). – Theodor Zoulias Aug 08 '20 at 17:32