0

I'm working with a piece of code that processes messages of a queue (using masstransit). Many messages can be processed in parallel. All messages create or modify an object in ActiveDirectory (in this case). All objects need to be validated against the AD schema definitions. (Though its not relevant to the problem, I want to note that we have many customers with custom extension in their AD Schema)

Retrieving the schema information is a slow operation. I want to do it 1 time and then cache it. But with many parallel processing messages. Many messages start getting the schema information before the first succeeds. So too much work is done. For the moment I fixed this with a simple semaphore. See code below.

But that is not a good solution as now only 1 thread can enter this code all the time.

I need something to lock the code 1 time per object and hold off other request until the first retrieval and caching is complete.

What kind of construct will allow me to do that?

private static SemaphoreSlim _lock = new SemaphoreSlim(1, 1);

public ActiveDirectorySchemaObject? GetSchemaObjectFor(string objectClass)
{

    //todo: create better solution
    _lock.Wait();
    try
    {
        if (_activeDirectorySchemaContainer.HasSchemaObjectFor(
            _scopeContext.CustomerId, objectClass) == false)
        {
            _logger.LogInformation($"Getting and caching schema from AD " +
                $"for {objectClass}");
            _activeDirectorySchemaContainer.SetSchemaObjectFor(
                _scopeContext.CustomerId, objectClass,
                GetSchemaFromActiveDirectory(objectClass));
        }
    }
    finally
    {
        _lock.Release();
    }
    return _activeDirectorySchemaContainer.GetSchemaObjectFor(
        _scopeContext.CustomerId, objectClass);
}

The following is a possible simplification of the question. In short. I am looking for the proper construct to lock a piece of code for parallel acces for every variation of a input.

A comment mentioned Lazy. Something I have not used before. But reading the docs I see it defers initialization of an object until later. Maybe I could refactor for that. But looking at the code as it currently is, I seem to need an lazy "if" or an lazy "function", but maybe I am over complicating. I find thinking about parallel programming often hurts my head.

As requested the schema container class code containing setschemafor and the other functions. Thanks so far for all information provided.

public interface IActiveDirectorySchemaContainer
    {
        //Dictionary<string, Dictionary<string, JObject>> schemaStore {  get; }

        bool HasSchemaObjectFor(string customerId, string objectClass);
        ActiveDirectorySchemaObject GetSchemaObjectFor(string customerId, string objectClass);
        void SetSchemaObjectFor(string customerId, string objectClass, ActiveDirectorySchemaObject schema);
    }



    public class ActiveDirectorySchemaContainer : IActiveDirectorySchemaContainer
    {
        private Dictionary<string, Dictionary<string, ActiveDirectorySchemaObject>> _schemaStore = new Dictionary<string, Dictionary<string, ActiveDirectorySchemaObject>>();

        public bool HasSchemaObjectFor(string customerId, string objectClass)
        {
            if (!_schemaStore.ContainsKey(customerId))
                return false;

            if (!_schemaStore[customerId].ContainsKey(objectClass))
                return false;

            if (_schemaStore[customerId][objectClass] != null)
                return true;
            else
                return false;
        }

        public ActiveDirectorySchemaObject GetSchemaObjectFor(string customerId, string objectClass)
        {
            return _schemaStore[customerId][objectClass];
        }

        public void SetSchemaObjectFor(string customerId, string objectClass, ActiveDirectorySchemaObject schemaObject)
        {
            if (HasSchemaObjectFor(customerId, objectClass))
            {
                _schemaStore[customerId][objectClass] = schemaObject;
            }
            else
            {
                if (!_schemaStore.ContainsKey(customerId))
                {
                    _schemaStore.Add(customerId, new Dictionary<string, ActiveDirectorySchemaObject>());
                }

                if (!_schemaStore[customerId].ContainsKey(objectClass))
                {
                    _schemaStore[customerId].Add(objectClass, schemaObject);
                }
                else
                {
                    _schemaStore[customerId][objectClass] = schemaObject;
                }
            }
        }
    }

The customerId is to separate schema information for multiple customers And the container is provided by dependency injection as a singleton. Every message can have a different customerId and be processed concurrently. Yet I want to retrieve schema data only a single time. This architecture might not be ideal, but I am not allowed to change that at this time.

 public static IServiceCollection AddActiveDirectorySchemaService(
             this IServiceCollection services)
        {
            services.AddScoped<IActiveDirectorySchemaService, ActiveDirectorySchemaService>();
            services.AddSingleton<IActiveDirectorySchemaContainer, ActiveDirectorySchemaContainer>();
            return services;
        }
zu1b
  • 422
  • 5
  • 11
  • I might be misunderstanding the situation, but you don't need to even consider the semaphore if you've already loaded the schema. One thread only loading the schema seems to make perfect sense. – ProgrammingLlama Jun 10 '22 at 11:45
  • @DiplomacyNotWar The problem with preloading the info is that that the queue of which I take messages in parallel contains an identifier for the customer in each message. So I don't know beforehand what I need to load from which customer. I would change that if I could. But I am not allowed :( So I need to prevent the multiple load and cache attempts while working. – zu1b Jun 10 '22 at 11:57
  • @TheodorZoulias No I haven't. I'm sorry to say I'm not that familiar with it. – zu1b Jun 10 '22 at 11:58
  • Ah, so essentially you cache a different schema for each `customerId`+`objectClass` combination, correct? – Theodor Zoulias Jun 10 '22 at 21:20
  • 1
    @TheodorZoulias Correct. As different customers can have different extensions on their schema. I think your example more or less still applies. It seems there is just an extra level to the "hierarchy" before you get to the lazy initialized object. Not sure if the nested (concurrent) dictionary should be lazy initialized to though. – zu1b Jun 11 '22 at 13:00

2 Answers2

0

A relatively simple approach would be to use a ConcurrentDictionary to keep a cache of loaded objects. Dictionaries divide items into buckets based on the hashcode of their keys, and then for ConcurrentDictionary, each bucket has its own lock. Using a dictionary like this will provide an efficiency boost over your current approach.

So as to avoid hammering the AD controller/database/whatever, I'm still going to use a semaphore to ensure that only one thread can request a schema at once. This only takes place when the dictionary doesn't already have the entry, however.

Note that this first option is more or less a complicated version of Theodor's answer, so if this works for you, it's probably best to go with that answer instead. And my second option could probably be optimised by incorporating Theodor's answer.

public class CachedSchemaContainer
{
    private readonly SchemaRetriever _schemaRetriever;
    private readonly ConcurrentDictionary<string, Schema> _schemaCache = new ConcurrentDictionary<string, Schema>();
    private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);

    public CachedSchemaContainer(ISchemaRetriever schemaRetriever)
    {
        _schemaRetriever = schemaRetriever;
    }

    public Schema GetSchemaObjectFor(string objectClass)
    {
        Schema schema;
        // try and retrieve the value
        if (_schemaCache.TryGetValue(objectClass, out schema))
        {
            return schema;
        }

        // OK, we need to wait our turn and try to load it from the AD controller
        _semaphoreSlim.Wait();
        try
        {
            // There's no point requerying it the last holder of the lock retrieved it, so check again
            if (_schemaCache.TryGetValue(objectClass, out schema))
            {
                return schema;
            }
                
            // Go and get the schema, add it to the dictionary, and then return it
            schema = _schemaRetriever.GetSchemaObjectFor(1, objectClass);
            _schemaCache.TryAdd(objectClass, schema);
            return schema;
        }
        finally
        {
            // release the semaphore
            _semaphoreSlim.Release();
        }
    }
}

Another possible optimisation might be to cache a reference to the Schema object per-thread. This would mean that no locking would be required in the case that a given thread has accessed this specific schema before. We still have the thread-safe ConcurrentDictionary to cache the values between threads, but ultimately this will avoid a lot of locking once the caches are warmed up/populated:

public class CachedSchemaContainer : IDisposable
{
    private readonly ISchemaRetriever _schemaRetriever;
    private readonly ConcurrentDictionary<string, Schema> _schemaCache = new ConcurrentDictionary<string, Schema>();
    private readonly ThreadLocal<Dictionary<string, Schema>> _threadSchemaCache = new ThreadLocal<Dictionary<string, Schema>>(() => new Dictionary<string, Schema>());
    private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);

    public CachedSchemaContainer(ISchemaRetriever schemaRetriever)
    {
        _schemaRetriever = schemaRetriever;
    }

    public Schema GetSchemaObjectFor(string objectClass)
    {
        Schema schema;

        // try and retrieve the value from the thread's cache
        if (_threadSchemaCache.Value.TryGetValue(objectClass, out schema))
        {
            return schema;
        }

        // try and retrieve the value
        if (_schemaCache.TryGetValue(objectClass, out schema))
        {
            // it was already cached in the shared dictionary, so let's add it to the thread's
            _threadSchemaCache.Value[objectClass] = schema;
            return schema;
        }

        // OK, we need to wait our turn and try to load it from the AD controller
        _semaphoreSlim.Wait();
        try
        {
            // There's no point requerying it the last holder of the lock retrieved it, so check again
            if (_schemaCache.TryGetValue(objectClass, out schema))
            {
                // it was already cached in the shared dictionary, so let's add it to the thread's
                _threadSchemaCache.Value[objectClass] = schema;
                return schema;
            }
                
            // Go and get the schema, add it to the shared and thread local dictionaries, and then return it
            schema = _schemaRetriever.GetSchemaObjectFor(1, objectClass);
            _schemaCache.TryAdd(objectClass, schema);
            _threadSchemaCache.Value[objectClass] = schema;
            return schema;
        }
        finally
        {
            // release the semaphore
            _semaphoreSlim.Release();
        }
    }

    public void Dispose()
    {
        _threadSchemaCache.Dispose();
    }
}

Common type definitions used in these examples:

public interface ISchemaRetriever
{
    Schema GetSchemaObjectFor(int customerId, string objectClass);
}

public class Schema
{
}

Documentation links:

Note: Schema here is a reference type (a class), so the dictionaries store a pointer to a common Schema object per loaded objectClass. As such, if one thread makes a change to the Schema object, then that could break another one, etc. unless the Schema object itself is also thread safe. If you're only reading values and not mutating the Schema objects, then you should have nothing to worry about there.

Also, as Theodor points out, unless you're planning to make this method async in the future, you could potentially do away with using a SemaphoreSlim and just use a simple lock (lockingObject) { } instead. Docs

ProgrammingLlama
  • 36,677
  • 7
  • 67
  • 86
  • Why use a `SemaphoreSlim`, and not a simple `lock`? Apparently there is no requirement to lock asynchronously, now or in the future. – Theodor Zoulias Jun 10 '22 at 13:42
  • 1
    @TheodorZoulias That's a fair point. Force of habit really. Much of the code I work with these days is async. – ProgrammingLlama Jun 10 '22 at 13:42
  • Also it might make more sense to use a dedicated `SemaphoreSlim`/locker per schema, instead of using just one for all schemas. It works, but by being overly protective it could result in added latency for no real reason. – Theodor Zoulias Jun 10 '22 at 13:49
  • @TheodorZoulias My logic was that _generally_ a cached value would already exist, so generally the semaphore wouldn't be used anyway, deferring to the `ConcurrentDictionary`'s per-partition locking. – ProgrammingLlama Jun 10 '22 at 13:51
  • Yes, in practice the amortized cost of using a single locker might be close to non-existent, but still it strikes me optically as inefficient though. To be honest the whole mechanism, although it's effective and reasonably efficient, seems to me overly complicated when something simpler like a `ConcurrentDictionary>` would probably suffice. – Theodor Zoulias Jun 10 '22 at 14:03
  • 1
    @TheodorZoulias Yep, that might work just as well. You should probably add that as an answer too, so that OP can test them out and see which works better for their situation. – ProgrammingLlama Jun 10 '22 at 14:07
0

Here is how you could use a ConcurrentDictionary<TKey,TValue> that has Lazy<T> objects as values, in order to ensure that the schema of each key will be initialized only once:

private readonly ConcurrentDictionary<(string CustomerId, string ObjectClass),
    Lazy<Schema>> _cachedSchemas = new();

public Schema GetSchemaObjectFor(string objectClass)
{
    var combinedKey = (_scopeContext.CustomerId, objectClass);
    Lazy<Schema> lazySchema = _cachedSchemas.GetOrAdd(combinedKey, key =>
    {
        return new Lazy<Schema>(() =>
        {
            _logger.LogInformation($"Getting schema for {key}");
            return GetSchemaFromActiveDirectory(key.ObjectClass);
        });
    });
    return lazySchema.Value;
}

The key of the ConcurrentDictionary<TKey,TValue> is a ValueTuple<string, string>. The first string is the customer ID, and the second is the object class. A new schema is created for each unique combination of these two strings.

Unfortunately the above suggestion suffers from a major flaw of the Lazy<T> class: its behavior regarding the handling of errors is not configurable. So if the valueFactory fails, all subsequent requests for the Value will receive the cached error. This behavior is a show-stopper for a caching system. Fortunately there are alternative Lazy<T> implementations available that exhibit the correct behavior for caching purposes, which is to retry the valueFactory if it fails. You can find here at least three robust and compact implementations, including one of my own that I posted yesterday.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    I should've done my shared dictionary in the second example like this. I wouldn't have needed to write any kind of locking at all. :D – ProgrammingLlama Jun 10 '22 at 15:21
  • @DiplomacyNotWar actually I just realized that my proposed solution has a significant drawback. In case the initialization of a `Schema` fails, all subsequent requests for this schema will receive the cached error. The initialization will not be retried. Bummer! Here is a question related to this problem: [Lazy without exception caching](https://stackoverflow.com/questions/34393352/lazyt-without-exception-caching). – Theodor Zoulias Jun 10 '22 at 16:27
  • I have add more code to the question. This seems to be going in the right direction. I need some time to experiment with this. – zu1b Jun 10 '22 at 16:29