4

I have a Task which starts a win process, which generates file if its not created yet and returns it. The problem is that the action is called more than once. To be more precisely its src attribute of a <track> element. I have ConcurrentDictionary<Guid, Task<string>> which keeps track of for which Id a process is currently running

public async Task<string> GenerateVTTFile(Guid Id)
{
  if (_currentGenerators.TryGetValue(id, out Task<string> task))
  {
     return await task; // problem is here?
  }

  var t = Run(); // Task
  _currentGenerators.TryAdd(id, t);

  return await t;
}

In the action method of the controller

var path = await _svc.GetSomePath();
if (string.IsNullOrEmpty(path))
{
    var path = await svc.GenerateVTTFile(id);

    return PhysicalFile(path, "text/vtt");
} 

return PhysicalFile(path, "text/vtt");

Run() method is just starting Process and waits it.

process.WaitForExit();

What I want to achieve is to return the result of the same task for the same Id. It seems that if the Id already exists in the dictionary and I await it starts another process (calls Run method again).

Is there a way to achieve that?

João Reis
  • 712
  • 3
  • 9
Expressingx
  • 1,480
  • 1
  • 14
  • 39
  • Can you post the complete function / method? It _should_ work if you omitted the `await`s. But that would mean your Method returns a Task to be awaited outside this call. – Fildor Apr 22 '20 at 13:21
  • @Fildor I did change, but I don't think it will help much. If I set breakpoint I can see the `Run` method started twice for 2 http calls with same `Id` query parameter. – Expressingx Apr 22 '20 at 13:29

3 Answers3

3

You can make the method atomic to protect the "dangerzone":

private SemaphoreSlim _sem = new SemaphoreSlim(1);

public Task<string> GenerateVTTFile(Guid Id)
{
  _sem.Wait();
  try
  {
     if (_currentGenerators.TryGetValue(Id, out Task<string> task))
     {
        return task;
     }

     var t = Run(); // Task
     _currentGenerators.TryAdd(Id, t); // While Thread 1 is here,
                                       // Thread 2 can already be past the check above ...
                                       // unless we make the method atomic like here.

     return t;
   }
   finally
   {
      _sem.Release();
   }
}

Drawback here is, that also calls with different ids have to wait. So that makes for a bottleneck. Of course, you could make an effort but hey: the dotnet guys did it for you:

Preferably, you can use GetOrAdd to do the same with only ConcurrentDictionary's methods:

public Task<string> GenerateVTTFile(Guid Id)
{
     // EDIT: This overload vv is actually NOT atomic!
     // DO NOT USE: 
     //return _currentGenerators.GetOrAdd(Id, () => Run());
     // Instead:
     return _currentGenerators.GetOrAdd(Id, 
                                        _ => new Lazy<Task<string>>(() => Run(id))).Value;
     // Fix "stolen" from Theodore Zoulias' Answer. Link to his answer below.
     // If you find this helped you, please upvote HIS answer.
}

Yes, it's really a "one-liner". Please see this answer: https://stackoverflow.com/a/61372518/982149 from which I took the fix for my flawed answer.

Fildor
  • 14,510
  • 4
  • 35
  • 67
  • Thanks, works as a charm. One question only. What if a request with different `Id` as query parameter comes? If I understand correctly it will wait if there is already `thread` running for some other `Id`? – Expressingx Apr 22 '20 at 13:43
  • @Expressingx Unfortunately, yes. That's why it's probably better to use ConcurrentDictionary's own `GetOrAdd` (see my edited answer). – Fildor Apr 22 '20 at 13:49
  • 1
    Definitely the one-liner is the go-to for my case. Just tested it and it behaves the same. Thanks! – Expressingx Apr 22 '20 at 13:56
  • 3
    @Expressingx I just read the "Remarks" on "GetOrAdd": It is actually also _not atomic_ when using the factory overload ... so, you may want to test this thoroughly. – Fildor Apr 22 '20 at 14:13
3

As pointed out already by João Reis, using simply the GetOrAdd method is not enough to ensure that a Task will be created only once per key. From the documentation:

If you call GetOrAdd simultaneously on different threads, valueFactory may be called multiple times, but only one key/value pair will be added to the dictionary.

The quick and lazy way to deal with this problem is to use the Lazy class. Instead of storing Task objects in the dictionary, you could store Lazy<Task> wrappers. This way even if a wrapper is created multiple times per key, all extraneous wrappers will be discarded without their Value property requested, and so without duplicate tasks created.

private ConcurrentDictionary<Guid, <Lazy<Task<string>>> _currentGenerators;

public Task<string> GenerateVTTFileAsync(Guid id)
{
    return _currentGenerators.GetOrAdd(id,
        _ => new Lazy<Task<string>>(() => Run(id))).Value;
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 2
    Andrew Lock goes into detail on this approach in his [blog post](https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy) from a few years ago. – Kirk Larkin Apr 22 '20 at 19:51
  • 1
    If the valueFactory is not expensive (Lazy.Value blocks the threads) then using Lazy is a much better approach than the one described by my answer simply because it is so much simpler, +1 – João Reis Apr 27 '20 at 12:38
  • In retrospect I am no longer feeling comfortable with the `Lazy>` combination. It is a dangerous mix of a blocking and an asynchronous component, that may cause threads to be blocked and loss of scalability in some scenarios. Currently I am in favor of implementing the same functionality using nested tasks (`Task`), as shown [here](https://stackoverflow.com/questions/54117652/concurrentdictionary-getoradd-async/65694270#65694270 "ConcurrentDictionary GetOrAdd async"). – Theodor Zoulias Jun 10 '22 at 18:54
2

In order to have multiple concurrent calls of that method but only one for each id, you need to use ConcurrentDictionary.GetOrAdd with SemaphoreSlim.

GetOrAdd is not enough because the factory parameter might be executed more than once, see "Remarks" here https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.getoradd?view=netframework-4.8

Here is an example:

private ConcurrentDictionary<Guid, Generator> _currentGenerators = 
    new ConcurrentDictionary<Guid, Generator>();

public async Task<string> GenerateVTTFile(Guid id)
{
    var generator = _currentGenerators.GetOrAdd(id, _ => new Generator());

    return await generator.RunGenerator().ConfigureAwait(false);
}

public class Generator
{
    private int _started = 0;

    private Task<string> _task;
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);

    public async Task<string> RunGenerator()
    {
        if (!IsInitialized())
        {
            await Initialize().ConfigureAwait(false);
        }

        return await Interlocked.CompareExchange(ref _task, null, null).ConfigureAwait(false);
    }

    private async Task Initialize()
    {
        await _semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            // check again after acquiring the lock
            if (IsInitialized())
            {
                return;
            }

            var task = Run();
            _ = Interlocked.Exchange(ref _task, task);
            Interlocked.Exchange(ref _started, 1);
        }
        finally
        {
            _semaphore.Release();
        }
    }

    private bool IsInitialized()
    {
        return Interlocked.CompareExchange(ref _started, 0, 0) == 1;
    }

    private async Task<string> Run()
    {
        // your implementation here
    }
}
João Reis
  • 712
  • 3
  • 9