I have a concurrent dictionary that is used to manage subscriptions (and interlink with other code), for the sake of this example a simplified version is shown.
I need to update an external message bus when topics are added or removed, instead of adding an additional synchronization mechanism and wrapping a blocking section around the dictionary add and message topic add subscription, I am using the concurrent dictionary and calling back an Action when the item is successfully added:
class SubscriptionManager
{
readonly ConcurrentDictionary<string, SubscribedTypes> subscriptions = new();
public void AddDynamic(string topic, Type type, Action<string> dynamicCallback)
{
subscriptions.AddOrUpdate(
topic,
(topic) =>
{
var subscribedTypes = new SubscribedTypes(qos);
/* do some work and checks */
dynamicCallback(topic); // call back the calling code
return subscribedTypes;
},
(topic, subscribedTypes) =>
{
/* snip */
subscribedTypes.Add(type);
return subscribedTypes;
}
);
}
}
My message bus is purely asynchronous, and the client would call as below:
subscriptionManager.AddDynamic("v1/health", HealthMessage,
async (topic) => await messageBus.Subscribe(topic));
UPDATES TO ORIGINAL QUESTION
Is the anonymous lambda even called as a async function call? ANSWER - this is not a good idea. The message bus could be disposed, so either assure through IoC lifetime management of singleton or implement SubscriptionManager
instance inside the message bus
Performance is critical here as on review some apps will create lots of subscriptions dynamically, and the dictionary will be read continuously.
I have therefore put in place a ReaderWriterLockSlim
lock to minimize locks on reads (with escalation to full lock on write) and when subscription to the message bus is required it is done outside of the critical section.
public class SubscriptionManager
{
// SubscribedTypes is a dictionary with some additional features
private readonly Dictionary<string, SubscribedTypes> dict = new();
private readonly ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
private readonly IMessageBus messageBus;
SubscriptionManager(IMessageBus messageBus)
{
this.messageBus = messageBus;
}
public async Task AddDynamicAsync(string topic, Type type)
{
var subscriptionRequired = false;
WriteLock(() => {
if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
{
// just add - no need to subscribe, is already subscribed when
// created for first time
value.DynamicRegistrations.Add(handlerType);
}
else
{
// this topic is new, add it
var subscribedTypes = new SubscribedTypes(qos);
subscribedTypes.DynamicRegistrations.Add(handlerType);
this.dict[topic] = subscribedTypes;
// we will need to register this on external bus
subscriptionRequired = true;
}
});
if(subscriptionRequired)
await this.messageBus.Subscribe(topic);
}
public async Task RemoveDynamicAsync(string topic, Type type)
{
var unsubscribeRequired = false;
WriteLock(() => {
if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
{
value.DynamicRegistrations.Remove(handlerType);
unsubscribeRequired = true;
}
else
{
throw new InvalidOperationException(
$"topic '{topic}' not registered.");
}
});
if(unsubscribeRequired)
await this.messageBus.Unsubscribe(topic);
}
public IEnumerable<Type> GetTypesForTopic(string topic)
{
var found = ReadLock(() => {
if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
return value.GetAll();
else
return new List<Type>();
});
}
private SubscribedTypes ReadLock(Func<SubscribedTypes> func)
{
rwLock.EnterReadLock();
try { return func(); }
finally { rwLock.ExitReadLock(); }
}
private void WriteLock(Action action)
{
rwLock.EnterWriteLock();
try { action(); }
finally { rwLock.ExitWriteLock(); }
}
}