2

I have this piece of code where I want to await on a ongoing task if that task was created for the same input. Here is minimal reproduction of what I'm doing.

private static ConcurrentDictionary<int, Task<int>> _tasks = new ConcurrentDictionary<int, Task<int>>();

private readonly ExternalService _service;


public async Task SampleTask(){
  var result = await _service.DoSomething();
  await Task.Delay(1000) //this task takes some time do finish
  return result;
}

public async Task<int> DoTask(int key) {
   var task = _tasks.GetOrAdd(key, _ => SampleTask());
   var taskResult = await task;
   _tasks.TryRemove(key, out task);
   return taskResult;
}

I'm writing a test to ensure the same task is awaited when multiple requests want to perform the task at (roughly) the same time. I'm doing that by mocking _service and counting how many times _service.DoSomething() is being called. It should be only once if the calls to DoTask(int key) where made at roughly the same time.

However, the results show me that if I call DoTask(int key) more than once with a delay between calls of less than 1~2ms, both tasks will create and execute its on instance of SampleTask() with the second one replacing the first one in the dictionary.

Considering this, can we say that this method is truly thread-safe? Or isn't my problem a case of thread-safety per se?

Henk Mollema
  • 44,194
  • 12
  • 93
  • 104
  • When you call `GetOrAdd()`, the delegate you pass to it can be called multiple times under some circumstances. Is that what you are seeing? – Matthew Watson Dec 17 '18 at 12:14
  • You are also not awaiting `SampleTask()` – Tseng Dec 17 '18 at 12:15
  • @MatthewWatson, what circunstances would those be? – Luís Gabriel de Andrade Dec 17 '18 at 12:19
  • @Tseng, do I need to await it in the factory delegate? I'm awaiting it right after it gets assigned to the `task` variable – Luís Gabriel de Andrade Dec 17 '18 at 12:22
  • @LuísGabrieldeAndrade what are you trying to do in the first place? `ConcurrentDictionary` is thread-safe but that doesn't mean the objects it contains are. It's not a task queue either. If you want to process multiple messages you should probably use `ActionBlock` or the other classes in the TPL DataflowBlock library. Another good option would be to use `System.Threading.Channels` to implement pub/sub workers. – Panagiotis Kanavos Dec 17 '18 at 12:22
  • @LuísGabrieldeAndrade ConcurrentDictionary and ConcurrentQueue are probably used by both Dataflow and Channels to implement their buffers deep down. There's a lot more to creating a pub/sub or async worker queue than the buffer though – Panagiotis Kanavos Dec 17 '18 at 12:25
  • [From the documentation](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.getoradd?view=netframework-4.7.2): `If you call GetOrAdd simultaneously on different threads, valueFactory may be called multiple times, but only one key/value pair will be added to the dictionary.` – Matthew Watson Dec 17 '18 at 12:28
  • @PanagiotisKanavos, I have this computation-intensive task to execute that may be called more than once at roughly the same time for the same input. I don't want to perform this task more than once simultaneously for the same input because it will yield the same result. – Luís Gabriel de Andrade Dec 17 '18 at 12:32
  • @LuísGabrieldeAndrade that's a very different question and still can't be implemented just with ConcurrentDictionary. It's available through Reactive Extensions though, as the `Throttle` operator. You can create an Observable or Subject that receives messages from multiple sources, eg multiple form submits, or multiple users submitting the same thing, put `Throttel(Timespan.FromSeconds(60).Subscribe(...)` to ensure that the task called by `subscribe` is only called once every 60 seconds – Panagiotis Kanavos Dec 17 '18 at 12:36
  • @LuísGabrieldeAndrade when I say different question, it's closer to different problem domain. Concurrent collections deal only with concurrent data modification of collections. You are asking about the behaviour of event streams though. *Maybe* you can use one to implement the other, but you still have to solve that *other* problem. – Panagiotis Kanavos Dec 17 '18 at 12:38
  • @MatthewWatson, thank you for pointing out that part of the docs. I guess I overlooked that part. My bad. – Luís Gabriel de Andrade Dec 17 '18 at 12:44

1 Answers1

7

To quote the documentation (emphasis mine):

For modifications and write operations to the dictionary, ConcurrentDictionary<TKey,TValue> uses fine-grained locking to ensure thread safety. (Read operations on the dictionary are performed in a lock-free manner.) However, the valueFactory delegate is called outside the locks to avoid the problems that can arise from executing unknown code under a lock. Therefore, GetOrAdd is not atomic with regards to all other operations on the ConcurrentDictionary<TKey,TValue> class.

Since a key/value can be inserted by another thread while valueFactory is generating a value, you cannot trust that just because valueFactory executed, its produced value will be inserted into the dictionary and returned. If you call GetOrAdd simultaneously on different threads, valueFactory may be called multiple times, but only one key/value pair will be added to the dictionary.

So while the dictionary is properly thread-safe, calls to the valueFactory, or _ => SampleTask() in your case, are not guaranteed to be unique. So your factory function should be able to live with that fact.

You can confirm this from the source:

public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory)
{
    if (key == null) throw new ArgumentNullException("key");
    if (valueFactory == null) throw new ArgumentNullException("valueFactory");

    TValue resultingValue;
    if (TryGetValue(key, out resultingValue))
    {
        return resultingValue;
    }
    TryAddInternal(key, valueFactory(key), false, true, out resultingValue);
    return resultingValue;
}

As you can see, valueFactory is being called outside of TryAddInternal which is responsible of locking the dictionary properly.

However, since valueFactory is a lambda function that returns a task in your case (_ => SampleTask()), and the dictionary will not await that task itself, the function will finish quickly and just return the incomplete Task after encountering the first await (when the async state machine is set up). So unless the calls are very quickly after another, the task should be added very quickly to the dictionary and subsequent calls will reuse the same task.

If you require this to happen just once in all cases, you should consider locking on the task creation yourself. Since it will finish quickly (regardless of how long your task actually takes to resolve), locking will not hurt that much.

poke
  • 369,085
  • 72
  • 557
  • 602