47

I want to use something like GetOrAdd with a ConcurrentDictionary as a cache to a webservice. Is there an async version of this dictionary? GetOrAdd will be making a web request using HttpClient, so it would be nice if there was a version of this dictionary where GetOrAdd was async.

To clear up some confusion, the contents of the dictionary will be the response from a call to a webservice.

ConcurrentDictionary<string, Response> _cache
    = new ConcurrentDictionary<string, Response>();

var response = _cache.GetOrAdd("id",
    (x) => { _httpClient.GetAsync(x).GetAwaiter().GetResponse(); });
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Zeus82
  • 6,065
  • 9
  • 53
  • 77
  • 3
    For me it sounds like async `GetOrAdd` doesn't make much sense. This method can only be executed synchronously. – Yeldar Kurmangaliyev Jan 09 '19 at 20:17
  • 8
    Adding to a dictionary is not an IO bound operation, it won't make sense to have async version of it. – JohanP Jan 09 '19 at 20:18
  • 2
    If you need to await something I'd suggest checking if the key is in the dictionary, and if not then await the Http call then call `GeOrAdd` with the result. Ultimately you'll have to check again in case something else inserted the key while you were waiting on the IO. – juharr Jan 09 '19 at 20:25
  • @juharr: That's exactly what `ConcurrentDictionary` does. It start by checking, then generates a new value and then check AGAIN before it's added. – Poul Bak Jun 08 '21 at 20:04

5 Answers5

37

GetOrAdd won't become an asynchronous operation because accessing the value of a dictionary isn't a long running operation.

What you can do however is simply store tasks in the dictionary, rather than the materialized result. Anyone needing the results can then await that task.

However, you also need to ensure that the operation is only ever started once, and not multiple times. To ensure that some operation runs only once, and not multiple times, you also need to add in Lazy:

ConcurrentDictionary<string, Lazy<Task<Response>>> _cache = new ConcurrentDictionary<string, Lazy<Task<Response>>>();

var response = await _cache.GetOrAdd("id", url => new Lazy<Task<Response>>(_httpClient.GetAsync(url))).Value;
Yepeekai
  • 2,545
  • 29
  • 22
Servy
  • 202,030
  • 26
  • 332
  • 449
  • 1
    This puts an incomplete `Task` in the cache. What happens if the `Task` faults or is cancelled? The task represents an HTTP request to a remote resource, chance of it failing is not negligible. – odyss-jii Jan 09 '19 at 21:11
  • @odyss-jii Yes, they would need to handle the error case and that would most likely involve removing it from the cache. – Servy Jan 09 '19 at 21:38
  • 5
    That is absolutely horrible design for a cache. It breaks the abstraction completely. If I fetch a value from a subsystem it is not my responsibility to clean-up its internal cache because it has a broken implementation. – odyss-jii Jan 10 '19 at 07:01
  • 3
    It doesn't need to be the end consumer of the cache that handles it, it can be the wrapper around this code that the OP writes. The code in this answer is not a completed production ready fully featured cache. It shows how to solve the question asked, which the OP will need to finish in their own wrapping cache to make it production worthy code. Just like how your answer has problems making it not completed production ready code, but rather just a solution to the question asked about. – Servy Jan 10 '19 at 14:18
  • I feel like the Lazy is redundant. Executing something like `_httpClient.GetAsync(url)` will return the Task immediately. – Darragh Jun 28 '20 at 21:16
  • 2
    @Darragh But then you're performing the operation more than once. That's very often not acceptable. The Lazy doesn't ensure the operation returns any faster, it ensures it never runs more than once. – Servy Jun 29 '20 at 00:33
14

The GetOrAdd method is not that great to use for this purpose. Since it does not guarantee that the factory runs only once, the only purpose it has is a minor optimization (minor since additions are rare anyway) in that it doesn't need to hash and find the correct bucket twice (which would happen twice if you get and set with two separate calls).

I would suggest that you check the cache first, if you do not find the value in the cache, then enter some form of critical section (lock, semaphore, etc.), re-check the cache, if still missing then fetch the value and insert into the cache.

This ensures that your backing store is only hit once; even if multiple requests get a cache miss at the same time, only the first one will actually fetch the value, the other requests will await the semaphore and then return early since they re-check the cache in the critical section.

Psuedo code (using SemaphoreSlim with count of 1, since you can await it asynchronously):

async Task<TResult> GetAsync(TKey key)
{
    // Try to fetch from catch
    if (cache.TryGetValue(key, out var result)) return result;

    // Get some resource lock here, for example use SemaphoreSlim 
    // which has async wait function:
    await semaphore.WaitAsync();    
    try 
    {
        // Try to fetch from cache again now that we have entered 
        // the critical section
        if (cache.TryGetValue(key, out result)) return result;

        // Fetch data from source (using your HttpClient or whatever), 
        // update your cache and return.
        return cache[key] = await FetchFromSourceAsync(...);
    }
    finally
    {
        semaphore.Release();
    }
}
odyss-jii
  • 2,619
  • 15
  • 21
  • If you're going to explicitly lock then you need to explicitly lock everywhere else that uses this collection as well, in order to ensure the operation is logically atomic. – Servy Jan 09 '19 at 20:45
  • 3
    The collection is ConcurrentDictionary, the collection itself is thread-safe. You are locking for a different reason here. – odyss-jii Jan 09 '19 at 20:46
  • 2
    The collection won't throw some sort of index out of bounds exception or return garbage data, because it's designed to be used from multiple threads, but you're now trying to perform multiple operations from it in sequence, and are relying on no changes to the collection during that time, which it won't provide for you. You'll need to explicitly lock not just here, but *everywhere* using the collection to ensure that someone else doesn't add the value after you found it missing, or anything like that. – Servy Jan 09 '19 at 20:52
  • 1
    I am not sure which hypothetical scenario you are thinking of, but it does not apply to this particular case. This is a fetch from a source with an in-memory cache, it does not matter if the collection changes during that time. The purpose of the lock is to protect the source from a surge if there are several concurrent cache misses; the purpose is not to synchronize access to the collection. – odyss-jii Jan 09 '19 at 20:59
  • 5
    Yes, it *does* matter if the collection changes. It can result in performing work multiple times that's not supposed to be repeated, for example. – Servy Jan 09 '19 at 21:40
6

Try this extension method:

/// <summary>
/// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey, TValue}"/> by using the specified function 
/// if the key does not already exist. Returns the new value, or the existing value if the key exists.
/// </summary>
public static async Task<TResult> GetOrAddAsync<TKey,TResult>(
    this ConcurrentDictionary<TKey,TResult> dict,
    TKey key, Func<TKey,Task<TResult>> asyncValueFactory)
{
    if (dict.TryGetValue(key, out TResult resultingValue))
    {
        return resultingValue;
    }
    var newValue = await asyncValueFactory(key);
    return dict.GetOrAdd(key, newValue);
}

Instead of dict.GetOrAdd(key,key=>something(key)), you use await dict.GetOrAddAsync(key,async key=>await something(key)). Obviously, in this situation you just write it as await dict.GetOrAddAsync(key,something), but I wanted to make it clear.

In regards to concerns about preserving the order of operations, I have the following observations:

  1. Using the normal GetOrAdd will get the same effect if you look at the way it is implemented. I literally used the same code and made it work for async. Reference says

the valueFactory delegate is called outside the locks to avoid the problems that can arise from executing unknown code under a lock. Therefore, GetOrAdd is not atomic with regards to all other operations on the ConcurrentDictionary<TKey,TValue> class

  1. SyncRoot is not supported in ConcurrentDictionary, they use an internal locking mechanism, so locking on it is not possible. Using your own lock mechanism works only for this extension method, though. If you use another flow (using GetOrAdd for example) you will face the same problem.
Siderite Zackwehdex
  • 6,293
  • 3
  • 30
  • 46
  • 1
    This implementation of `GetOrAddAsync` does not preserve the order of operations. Scenario: the Workflow-1 invokes `.GetOrAddAsync("Key", GetAsync("A"))`, then the Workflow-2 invokes `.GetOrAddAsync("Key", GetAsync("B"))`, then the Workflow-3 invokes `.TryRemove("Key", out _)`. Finally the dictionary could end up having either the value "A" or "B", or no value at all. This happens because this `GetOrAddAsync` implementation postpones storing anything into the dictionary until the asynchronous delegate completes. – Theodor Zoulias Jan 13 '21 at 04:47
  • You mean if you use it without await? – Siderite Zackwehdex Jan 13 '21 at 06:22
  • Siderite no, I mean if you await the method properly. My scenario involves three independent asynchronous workflows, where each workflow calls the API and `await`s the returned task. In this scenario the task awaited by the Workflow-1 could take longer to complete than the task awaited by the Workflow-2, in which case the Workflow-1 would overwrite the value entered in the dictionary by the Workflow-2. This behavior would be surprising to say the least. – Theodor Zoulias Jan 13 '21 at 06:44
  • I've updated my answer. I appreciate the level of attention in your implementation, but I believe it is overly complicated for the reasons I listed in the answer. The SO question was about using GetOrAdd with an async delegate, which implies accepting the limitations of the original method. – Siderite Zackwehdex Jan 13 '21 at 07:24
  • Siderite I see your point. Your implementation has indeed a similar behavior to the native `GetOrAdd` ([source code](https://referencesource.microsoft.com/mscorlib/system/Collections/Concurrent/ConcurrentDictionary.cs.html#d8b8308343be2763)). I guess that the typical asynchronous work has a longer duration than the typical synchronous work, and this could make the drawbacks of the native behavior more prominent. In any case my downvote was unwarranted and I revoked it. – Theodor Zoulias Jan 13 '21 at 07:40
5

Probably using a dedicated memory cache with advanced asynchronous capabilities, like the LazyCache by Alastair Crabtree, would be preferable to using a simple ConcurrentDictionary<K,V>. You would get commonly needed functionality like time-based expiration, or automatic eviction of entries that are dependent on other entries that have expired, or are dependent on mutable external resources (like files, databases etc). These features are not trivial to implement manually.

Below is a custom extension method GetOrAddAsync for ConcurrentDictionarys that have Task<TValue> values. It accepts a factory method, and ensures that the method will be invoked at most once. It also ensures that failed tasks are removed from the dictionary.

/// <summary>
/// Returns an existing task from the concurrent dictionary, or adds a new task
/// using the specified asynchronous factory method. Concurrent invocations for
/// the same key are prevented, unless the task is removed before the completion
/// of the delegate. Failed tasks are evicted from the concurrent dictionary.
/// </summary>
public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, Task<TValue>> valueFactory)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(valueFactory);
    Task<TValue> currentTask;
    if (source.TryGetValue(key, out currentTask))
        return currentTask;

    Task<Task<TValue>> newTaskTask = new(() => valueFactory(key));
    Task<TValue> newTask = null;
    newTask = newTaskTask.Unwrap().ContinueWith(task =>
    {
        if (!task.IsCompletedSuccessfully)
            source.TryRemove(KeyValuePair.Create(key, newTask));
        return task;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously,
        TaskScheduler.Default).Unwrap();

    currentTask = source.GetOrAdd(key, newTask);
    if (ReferenceEquals(currentTask, newTask))
        newTaskTask.RunSynchronously(TaskScheduler.Default);

    return currentTask;
}

This method is implemented using the Task constructor for creating a cold Task, that is started only if it is added successfully in the dictionary. Otherwise, if another thread wins the race to add the same key, the cold task is discarded. The advantage of using this technique over the simpler Lazy<Task> is that in case the valueFactory blocks the current thread, it won't block also other threads that are awaiting for the same key. The same technique can be used for implementing an AsyncLazy<T> or an AsyncExpiringLazy<T> class.

Usage example:

ConcurrentDictionary<string, Task<JsonDocument>> cache = new();

JsonDocument document = await cache.GetOrAddAsync("https://example.com", async url =>
{
    string content = await _httpClient.GetStringAsync(url);
    return JsonDocument.Parse(content);
});

Overload with synchronous valueFactory delegate:

public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, TValue> valueFactory)
{
    ArgumentNullException.ThrowIfNull(valueFactory);
    return source.GetOrAddAsync(key, key => Task.FromResult<TValue>(valueFactory(key)));
}

Both overloads invoke the valueFactory delegate on the current thread. If you have some reason to prefer invoking the delegate on the ThreadPool, you can just replace the RunSynchronously with the Start.

For a version of the GetOrAddAsync method that compiles on .NET versions older than .NET 6, you can look at the 3rd revision of this answer.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    I've some questions abount your solution. Is `Unwrap` the same like `await newTaskTask`? Why not just using `source.TryRemove(key, out _)` instead of casting the `source` to `ICollection<>`? I'm a little confused about the `newTaskTask.RunSynchronously(TaskScheduler.Default)`. This looks a little strange. Do we need this? The caller will do an `await`. Will this ensure that the task is called? – Sebastian Schumann Feb 16 '21 at 07:10
  • The question is: Is [this implementation](https://dotnetfiddle.net/GD4mwq) more or less equivalent to your example? Yes I know that the execution is "deferred" (not really) until the first caller awaits the returned task. – Sebastian Schumann Feb 16 '21 at 08:06
  • @SebastianSchumann sure, and thanks for asking. The `newTaskTask.Unwrap()` is indeed the same with `await newTaskTask`, provided that task is hot (i.e. it has already started). This is not the case here, and a deadlock will ensue if we try to `await` the task. The task is intentionally cold, because we want to start the task only after it has been successfully inserted into the dictionary. Otherwise, in case the race to update the dictionary has been lost, the cold task will just be discarded. – Theodor Zoulias Feb 16 '21 at 08:14
  • The reason that the `source.TryRemove(key, out _)` is not sufficient is because the `GetOrAddAsync` is just an extension method, and it doesn't control entirely the contents of the dictionary. So it is possible that while the task is running, some other code may replace the task with another task. In case our task fails, we want to remove it from the dictionary only if it's still there, and not remove some other task that is not known to us. – Theodor Zoulias Feb 16 '21 at 08:15
  • The `newTaskTask.RunSynchronously(TaskScheduler.Default)` starts the outer task, which invokes the `valueFactory` delegate. Before reaching this point, the `valueFactory` has not been invoked. It is essential that the `valueFactory` is invoked only once, in case that multiple threads are racing to insert this key into the dictionary. The `TaskScheduler.Default` argument ensures that the `valueFactory` will be invoked synchronously by a well known `TaskScheduler`, and that we are not in the mercy of the `TaskScheduler.Current` (the default value of the parameter), whatever it may be. – Theodor Zoulias Feb 16 '21 at 08:16
  • So, no, [that](https://dotnetfiddle.net/GD4mwq) implementation is not equivalent with the implementation of this answer! – Theodor Zoulias Feb 16 '21 at 08:16
  • Oh yes, sorry - I was totally blind. I didn't see the _cold_ task. This is indeed correct and needed. And yes I forgot the possibility to change a value using `dict[key] = value` that changes the value. Sorry for that. – Sebastian Schumann Feb 16 '21 at 08:28
  • @SebastianSchumann no worries. I am happy that someone asked, so that I can explain the nuances of this peculiar-looking method. :-) – Theodor Zoulias Feb 16 '21 at 08:29
  • Yes. I deleted my question after I read the docs about that function. – Sebastian Schumann Feb 16 '21 at 09:05
  • But there is still one confusion left: Why do we need that call to `RunSynchronously`? There is a race condition: Let's assume that one call creates that task, adds it to the dict and a context switch occurs just before the call to `Run..`. An other task get the already added task and awaits it. This should invoke the `valueFactory`. And should even this factory be also invoked by `Run...`. What am I missing here? – Sebastian Schumann Feb 16 '21 at 09:08
  • @SebastianSchumann awaiting the task is not sufficient to invoke the `valueFactory`. Only the current thread knows about the nested `Task>`, and has the means to start it. If it omits to start it, both tasks, the nested and the unwrapped, will remain cold forever. Any workflow that attempts to await the task, will just deadlock. – Theodor Zoulias Feb 16 '21 at 09:14
  • Okay - I think I got it. Just to be sure: The `newTaskTask` is only known by the current thread - that's obvious. Only this thread is able to call it - clear. A concurrent call gets the `newTask` - also obious. An await to this `newTask` will block because the `newTaskTask` has not been started. Puh - okay. If this assumption holds the implementation is correct. – Sebastian Schumann Feb 16 '21 at 09:20
  • @SebastianSchumann yeap, there is a lot going on in this little method. :-) – Theodor Zoulias Feb 16 '21 at 09:22
  • @SebastianSchumann btw the same idea has been used [here](https://stackoverflow.com/questions/28340177/enforce-an-async-method-to-be-called-once/65714904#65714904) in order to implement an improved `AsyncLazy` class. Creating cold tasks is rarely a good idea, but when it is, it works wonders! – Theodor Zoulias Feb 16 '21 at 11:18
  • @TheodorZoulias About to use this if it is still relevent? I see 2 years ago you are talking about an AsyncLazy being improved. All this stuff is above my pay grade, but I do need ability to use async/await in a ConcurrentDictionaryFactory. Is this still your suggested way to go? – Terry Jul 26 '23 at 12:34
  • @Terry yes, it is. I've reviewed and updated the answer recently. Personally I would not hesitate to use the `GetOrAddAsync` extension method in production, if all I needed is permanent caching without expiration policy. For what it does, I think that this code is as good as it can be. – Theodor Zoulias Jul 26 '23 at 13:02
  • @TheodorZoulias Ah, I missed the 'edited date'. If I have (soft) expiration policies, you still recommend that library you originally mentioned? Seems like its repo hasn't had commits for a while. – Terry Jul 26 '23 at 14:07
  • 1
    @Terry nowadays I am less enthusiastic about Alastair Crabtree's LazyCache, after discovering a thread-safety issue a few months ago, for which I opened a [pull request](https://github.com/alastairtree/LazyCache/pull/187) and gotten no feedback from the owner of the repository. Nonetheless I am not aware of a better alternative. Microsoft's [`MemoryCache`](https://learn.microsoft.com/en-us/dotnet/api/microsoft.extensions.caching.memory.memorycache) is even worse IMHO. It's missing important features. – Theodor Zoulias Jul 26 '23 at 14:35
  • @TheodorZoulias One last related question since I have a feeling you'll know. For a ConcurrentDictionary (including your implementation above), when the add factory needs to be called, is the entire dictionary locked, or only that 'key entry' is locked? – Terry Jul 26 '23 at 16:51
  • 1
    @Terry the `ConcurrentDictionary` employs a granular locking scheme. It maintains a number of locker objects, initially as many as the number of CPU cores, and locks on one of these objects during each `TryAdd`, `TryUpdate` and `TryRemove` operation. So in general threads that update different keys concurrently, don't contend for the same locker. Some global operations, like the `Clear` and `ToArray`, require the acquisition of all the lockers. The greatest feature of the collection is that reading is lock-free. For example the `TryGetValue` doesn't use any locker, and it's super fast. – Theodor Zoulias Jul 26 '23 at 17:20
  • @TheodorZoulias This has almost turned into a chat ;) But hopefully my last q. I thought I was all strait except when I declaired `ConcurrentDictionary> _cache`, it seems that the in-built `GetOrAdd` is *already* awaitable and seems to compile without using the extension above. I assume I can't just use the built in one right? The reason I asked is because I needed to use `AddOrUpdate` as well, which you didn't have in your answer but it seemed already awaitable and that is where I got confused. – Terry Jul 27 '23 at 02:17
  • 1
    @Terry the built-in [`GetOrAdd`](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.getoradd) invokes the `valueFactory` delegate without entering a `lock`, therefore the invocation is not atomic. It is possible that two contending threads will both invoke the delegate, causing two asynchronous operations to be launched concurrently. The same is true for the `AddOrUpdate`. This is not desirable in general, and this is the problem that my answer is attempting to solve. – Theodor Zoulias Jul 27 '23 at 02:55
  • @Terry as far as I know there has never been a request for using the [`AddOrUpdate`](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.addorupdate) with asynchronous `addValueFactory` and `updateValueFactory` delegates. You could consider posting a new question if you have such a case. It might be an interesting question. – Theodor Zoulias Jul 27 '23 at 03:00
-3

I solved this years ago before ConcurrentDictionary and the TPL was born. I'm in a café and don't have that original code but it went something like this.

It's not a rigorous answer but may inspire your own solution. The important thing is to return the value that was just added or exists already along with the boolean so you can fork execution.

The design lets you easily fork the race winning logic vs. the losing logic.

public bool TryAddValue(TKey key, TValue value, out TValue contains)
{
    // guards etc.

    while (true)
    {
        if (this.concurrentDic.TryAdd(key, value))
        {
            contains = value;
            return true;
        }
        else if (this.concurrentDic.TryGetValue(key, out var existing))
        {
            contains = existing;
            return false;
        }
        else
        {
            // Slipped down the rare path. The value was removed between the
            // above checks. I think just keep trying because we must have
            // been really unlucky.

            // Note this spinning will cause adds to execute out of
            // order since a very unlucky add on a fast moving collection
            // could in theory be bumped again and again before getting
            // lucky and getting its value added, or locating existing.

            // A tiny random sleep might work. Experiment under load.
        }
    }
}

This could be made into an extension for ConcurrentDictionary or be a method on its own your own cache or something using locks.

Perhaps a GetOrAdd(K,V) could be used with an Object.ReferenceEquals() to check if it was added or not, instead of the spin design.

To be honest, the above code isn't the point of my answer. The power comes in the simple design of the method signature and how it affords the following:

static readonly ConcurrentDictionary<string, Task<Task<Thing>>> tasks = new();

//

var newTask = new Task<Task<Thing>>(() => GetThingAsync(thingId));

if (this.tasks.TryAddValue(thingId, newTask, out var task))
{
    task.Start();
}

var thingTask = await task;
var thing = await thingTask;

It's a little quirky how a Task needs to hold a Task (if your work is async), and there's the allocations of unused Tasks to consider.

I think it's a shame Microsoft didn't ship its thread-safe collection with this method, or extract a "concurrent collection" interface.

My real implementation was a cache with sophisticated expiring inner collections and stuff. I guess you could subclass the .NET Task class and add a CreatedAt property to aid with eviction.

Disclaimer I've not tried this at all, it's off top of head, but I used this sort of design in an ultra-hi thru-put app in 2009.

Luke Puplett
  • 42,091
  • 47
  • 181
  • 266