2

There were multiple times in different applications that I needed to accomplish the following behavior with C# Task and I did it in a certain way, and would like to receive an insight whether it's the best way to achieve the desired effect, or there are other better ways.

The issue is that in certain circumstances I would like a specific Task to exist only in one instance. For example, if someone requests, let's say a list of products by executed a method like Task GetProductsAsync(), and someone else tries to request the same thing, it wouldn't fire another task, but rather return already existing task. When the GetProductsAsync finishes, all of those callers who had previously requested the result will receive the same result. So, there should ever be only one GetProductsAsync execution at a given point of time.

After failed trials to find something similar and well known design pattern to solve this issue, I came up with my own implementation. Here is it

public class TaskManager : ITaskManager
    {
        private readonly object _taskLocker = new object();
        private readonly Dictionary<string, Task> _tasks = new Dictionary<string, Task>();
        private readonly Dictionary<string, Task> _continuations = new Dictionary<string, Task>();

        public Task<T> ExecuteOnceAsync<T>(string taskId, Func<Task<T>> taskFactory)
        {
            lock(_taskLocker)
            {

                if(_tasks.TryGetValue(taskId, out Task task))
                {
                    if(!(task is Task<T> concreteTask))
                    {
                        throw new TaskManagerException($"Task with id {taskId} already exists but it has a different type {task.GetType()}. {typeof(Task<T>)} was expected");
                    }
                    else
                    {
                        return concreteTask;
                    }
                }
                else
                {
                    Task<T> concreteTask = taskFactory();
                    _tasks.Add(taskId, concreteTask);
                    _continuations.Add(taskId, concreteTask.ContinueWith(_ => RemoveTask(taskId)));
                    return concreteTask;
                }
            }
        }

        private void RemoveTask(string taskId)
        {
            lock(_taskLocker)
            {
                if(_tasks.ContainsKey(taskId))
                {
                    _tasks.Remove(taskId);
                }

                if(_continuations.ContainsKey(taskId))
                {
                    _continuations.Remove(taskId);
                }
            }
        }
    }

The idea is that we will have a single instance of TaskManager throughout the application lifetime. Any async Task request that should be executed only once at a given point in time, will call ExecuteOnceAsync providing the factory method to create the Task itself, and desired application wide unique ID. Any other task that will come in with the same ID, the Task manager with reply with the same instance of Task created before. Only if there are no other Tasks with that ID, the manager will call the factory method and will start the task. I have added locks around code task creation and removal, to ensure thread safety. Also, in order to remove the task from the stored dictionary after Task has been completed, I've added a continuation task using ContinueWith method. So, after task has been completed, both the task itself, and its continuation will be removed.

From my side this seems to be a pretty common scenario. I would assume there is a well established design pattern, or perhaps C# API that accomplishes this exact same thing. So, any insights or suggestions will be very appreciated.

kyurkchyan
  • 2,260
  • 2
  • 23
  • 37
  • Couldn't this be accomplished using an IoC container such as Autofac? Registering a specific Task with a single instance lifetime scope? Then you'd simply request the desired task through a constructor, with no knowledge or concern over its lifetime management. – olitee Feb 07 '19 at 12:49
  • Thanks for reply @olitee. That might be possible actually. but I think it'll require almost the same code I did - like identifying task, resolving it by ID, then when it's done disposing it. So, I would need to have the piece with continuation, to be able to remove the task. Then, I would need to lock the continuation and task creation part, as those operations should be atomic. What do you think? – kyurkchyan Feb 07 '19 at 12:53
  • As for the existing API you can consider using `Lazy`. As a downside you'll have to keep references for in `Lazy` variables for every action you'd like to execute only once. – Dmytro Mukalov Feb 07 '19 at 12:57
  • @DmytroMukalov good point. Probably using Lazy task would only solve the Factory part, isn't it? I'll still need to keep track of references and remove them from the list anyway? – kyurkchyan Feb 07 '19 at 12:59
  • Who calls RemoveTask method – Hasan Emrah Süngü Feb 07 '19 at 13:00
  • @HasanEmrahSüngü, I subscribe to Task completion using ContinueWith method on the task that user will provide, so as soon as Task is completed, my code will be called to continue the task execution, which will internally call the RemoveTask. – kyurkchyan Feb 07 '19 at 13:02
  • It actually depends on the life-cycle of your tasks it's not clear when they are removed. – Dmytro Mukalov Feb 07 '19 at 13:02
  • @DmytroMukalov take a look at the previous comment. Just to summarize, inside I subsribe to Task completion using ContinueWith and remove it when the task is completed. – kyurkchyan Feb 07 '19 at 13:03
  • OK then, your implementation looks fine. You should consider using a ConcurrentDictionary, using a simple lock can be wasteful if you are creating many tasks and removing them – Hasan Emrah Süngü Feb 07 '19 at 13:05
  • @HasanEmrahSüngü thanks, I was considering using one, but the reason I didn't do that is because I have several conditions and several steps to check before doing dictionary operation, and I need those locks anyway, and as those locks are there already, it wouldn't make any sense to have an extra concurrent dictionary, cause I already do the locking. Do I miss something? Do you see it differently? I mean regarding locking and why I didn't use the ConcurrentDictionary – kyurkchyan Feb 07 '19 at 13:14
  • I'm not seeing what `_continuations` is meant to be doing/helping with here. It's not necessary for e.g. GC purposes and I can't think of why else it might exist. – Damien_The_Unbeliever Feb 07 '19 at 13:41
  • Wouldn't it make more sense to store currently running jobs in a database and check if a specific job is currently running before allowing someone to start a new one? – FCin Feb 07 '19 at 13:43
  • 3
    It seems really error-prone and race-condition-y to try to solve this on the task level. If you need result caching, just stick an actual cache in the middle and have the (still distinct) tasks get their results from there. The case where actual, concurrent requests are happening that should be coalesced (and not serialized) is not, I would say, common at all -- this suggests there's a problem on a higher level where unnecessary requests are being issued that should have been coalesced somewhere else, where controlling the degree of parallelism is easier. – Jeroen Mostert Feb 07 '19 at 13:45
  • Oh, and also, if this code doesn't have actual problems and you're just soliciting suggestions for improvements, you should probably take it to https://codereview.stackexchange.com . – Jeroen Mostert Feb 07 '19 at 13:55
  • @Damien_The_Unbeliever it's exactly for that, to prevent cancellation being GC-d. – kyurkchyan Feb 07 '19 at 15:01
  • @FCin I think that'd be overkill for, singletone in-memory manager class would be enough – kyurkchyan Feb 07 '19 at 15:01
  • @JeroenMostert that's a very good point. But in the scenarios that I was experiencing it wasn't possible to coalesce tasks at a higher level. The purpose of this is not caching either - it is task coalescing into one, so rather than result being cached, it should be returned once at a time, and next time again it'd be re-executed, rather than being taken from Cache. I brought an example of request, cause that was simple, in real world app, the Task being coalesced is much more complicated. Regarding codereview part, you are right, codereview.stackexchange.com would be a better place for this – kyurkchyan Feb 07 '19 at 15:03
  • Like I said, it's not needed for that purpose. The `Task`s that you're calling `ContinueWith` on keeps live references to its continuations itself (it has to - how else could it invoke them at the correct time otherwise) – Damien_The_Unbeliever Feb 07 '19 at 15:43
  • For simplicity I would look at making something like `Lazy` for tasks – Cory Nelson Feb 07 '19 at 17:32
  • Somewhat related: [ConcurrentDictionary GetOrAdd async](https://stackoverflow.com/questions/54117652/concurrentdictionary-getoradd-async) – Theodor Zoulias Jan 28 '22 at 19:10

2 Answers2

0

I think you can do this with a one of the useful concurrent classes.

It begins the work only if it fails to get any existing work, then waits for it to complete. If it has been run before, it'll get the already completed (or in progress) task and wait for it to complete.

// Add this static dictionary to your class

static readonly ConcurrentDictionary<string, Task> tasks = new();

// Add this to your doing something once method in that class

var work = this.tasks.GetOrAdd(taskId, _ =>
{
    return client.DoSomethingAsync();
});

await work;

Important

See Theodor's comment below about the delegate running outside the lock. This is unfortunate and goes back to a design for a concurrent dictionary I made well over 10 years ago (and pitched to Microsoft as a better way to do ConcurrentDictionary).

I have my own thread-safe dictionary with this method.

bool TryAdd(K key, T value, out T contains);

This simple design returns the newly-added or existing value and affords some awesome patterns, especially for caching/request de-duping; adding a task to get something vs. waiting on an existing task to get it.

I notice ConcurrentDictionary has a TryAdd but it doesn't return the existing value upon failure, which is a shame, but something like this might work:

static readonly ConcurrentDictionary<string, Task<Task>> tasks = new();

//

var newTask = new Task<Task>(() => DoSomethingAsync());

if (this.tasks.TryAdd(taskId, newTask))
{
    newTask.Start();
}

var somethingTask = await this.tasks[taskId];
await somethingTask;

It assumes the tasks are never removed.

Basically there's an outer Task which only gets started by the race winner. There needs to be another Task inside it if the work being done is async, like creating a file or a network resource.

Fingers crossed this one's good.

Luke Puplett
  • 42,091
  • 47
  • 181
  • 266
  • 1
    From the [documentation](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.getoradd): *"However, the `valueFactory` delegate is called outside the locks to avoid the problems that can arise from executing unknown code under a lock. Therefore, `GetOrAdd` is not atomic with regards to all other operations on the `ConcurrentDictionary class`."* So there is no guarantee that two threads will not invoke concurrently the `client.DoSomethingAsync` method. – Theodor Zoulias Jan 28 '22 at 19:16
  • 1
    @TheodorZoulias Good save, sir! – Luke Puplett Feb 11 '22 at 17:06
  • 1
    Luke your latest edit is somewhat relevant to this question: [ConcurrentDictionary GetOrAdd async](https://stackoverflow.com/questions/54117652/concurrentdictionary-getoradd-async) – Theodor Zoulias Feb 11 '22 at 18:12
-1

If you want to allow multiple tasks with the same taskId but different data type to run at the same time, then you could move the generic type <T> from the method to the class. That way each type <T> has its own dictionary. It also allows you to make the dictionary store Task<T> and avoid typecasting.

You said that you'll only have a single instance of the TaskManager class, but if you can change that to only having a single instance of each class that calls into TaskManager, you can avoid needing a dictionary (and therefore hashing the tasks's name) by each class having an instance of

public class TaskManager<T>
{
    private Task<T> _currentTask;
    private object _lock = new object();

    public Task<T> ExecuteOnceAsync(string taskId, Func<Task<T>> taskFactory)
    {
        if (_currentTask == null)
        {
            lock (_lock)
            {
                if (_currentTask == null)
                {
                    Task<T> concreteTask = taskFactory();
                    concreteTask.ContinueWith(RemoveTask);
                    _currentTask = concreteTask;
                }
            }
        }

        return _currentTask;
    }

    private void RemoveTask()
    {
        _currentTask = null;
    }
}

If you want to be super correct, then you can use Interlocked.Exchange instead of assigning values directly to _currentTask, which is more important in the anonymous function passed to ContinueWith, since it runs outside of the lock. You might want to look into using some Interlocked method to check if the value is null or not as well. The lock isn't needed, by the way, if you know that calls to ExecuteOnceAsync won't happen in parallel, for example if it gets called by a button click event handler in a WPF or WinForms app (even if a user clicks multiple times, each should happen sequentially). A web app can't make such guarantees though.

You can see I do the if (_currentTask == null) check twice, once inside the lock and once outside the lock, which allows the case of a task already existing to avoid the perf hit of needing to acquire the lock. I don't believe you can use the same trick with the code in your question, because if RemoveTask is called at the same time as TryGetValue, you can get into a bad state. You could switch to using a ConcurrentDictionary, and that would probably allow you to avoid using the lock inside RemoveTask and only lock in ExecuteOnceAsync when TryGetValue retuns false. But whether that improves performance or not needs to be measured.

Another issue with both your code and mine is that when taskFactory is slow, it holds the lock for a long time (remember, when calling an async method, it runs synchronously until the first await where the task it awaits is not already complete). This can be mitigated by using Task<T> concreteTask = Task.Run(()=> taskFactory());, but Task.Run has it's own overhead, so if taskFactory does indeed await very quickly, it's probably better performance to keep it as it is. Again, if perf matters you need to measure.

And while I'm by no means a perf expert, depending on your perf requirements my suggestion has several improvements:

  • While Dictionary lookups are O(1), so is Thread.Sleep(1000), in other words it could be constantly slower than an alternative, even if the alternative is O(n) (for small values of n). Since your dictionary key is a string, string.GetHashCode is O(n) depending on the length of the string, and Dictionary perf can be hurt by hash collisions. My suggestion only has a single task object, so it's also O(1) and it doesn't require hashing anything or searching an array or however the Dictionary is implemented.
  • It's super minor, but since my RemoveTask doesn't need to capture any variables, I can pass it directly to ContinueWith as a delegate (since types match), whereas you use an anonymous function which means the compiler has to create an anonymous class to capture the taskId variable, and create an instance of this class on the heap each time your code runs, which increases memory pressure. This, by itself won't have any significant perf impact (unless run in a very tight loop), but it can contribute to "death by 1000 cuts" if you have many other bits of code which create small objects on the heap.
  • As already mentioned above, my code is lock-free when a task already exists.
  • Your code effectively uses a global application lock. This is the most important perf killer. If your TaskManager is used every time a database request is made, then your implementation can only start one database query at a time, even though two tasks with different taskId can never return the same task. And RemoveTask, regardless of which task completes, blocks all calls to ExecuteOnceAsync. By using a different instance of TaskManager per query type, you can avoid GetProductsAsync from blocking GetCustomerInformation.

Having written all that, if your goal is not to use one TaskManager per application, but instead use one TaskManager per database query method, meaning that taskId is actually a representation of the query parameters of GetProductsAsync, then you should ignore everything I wrote :) Well, almost everything. You can still move the <T> to the class and dictionary definitions, and avoid doing typecasting in your method.

zivkan
  • 12,793
  • 2
  • 34
  • 51
  • thanks for an awesome answer :) I got all of you points! And they make sense. Though we are not going to use this TaskManager for queries, it's going to be invoked very rarely, what we need to make sure if it ever gets invoked twice, then both of the requests should return the same result. Totally agree in DB situation, this might be an overkill. Let me evaluate the answer and do some trials. And I'll get back in here. Again very appreciate such a detailed answer – kyurkchyan Feb 08 '19 at 07:58
  • One question on the code. In the piece where you do ContinueWith, you don't store the value returned - a task, won't it be garbage collected after it's done? Also yeah, I do believe we need locking both in Remove and ExecuteOnce, as this method is going to be called not very seldom I don't mind having a lock for several milliseconds :) – kyurkchyan Feb 08 '19 at 08:11
  • I don't believe storing the continuation task is necessary. Firstly, [the examples in the docs don't do it](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.continuewith?view=netframework-4.7.2). Secondly, as Damien_The_Unbeliever wrote as a comment to your question, [the continuation task is registered](https://referencesource.microsoft.com/#mscorlib/system/threading/Tasks/Task.cs,4124), so that once the first task completes, it has a reference to the continuation task to execute (which itself might have a continuation, so storing only the delegate is insufficient) – zivkan Feb 08 '19 at 13:06
  • Just in case anyone reading this isn't already aware, garbage collection in .NET uses marking to find objects that still have a reference from the application root. The application root does not mean Program.Main, by the way. The runtime can keep references to objects that you don't in any of your own code. An object going out of scope does not automatically become eligible for garbage collection. – zivkan Feb 08 '19 at 13:33