3

I have created a class that allows me to run multiple operations concurrently with an option to set a max concurrency limit. I.e., if I have 100 operations to do, and I set maxCurrency to 10, at any given time, maximum 10 operations should be running concurrently. Eventually, all of the operations should be executed.

Here's the code:

public async Task<IReadOnlyCollection<T>> Run<T>(IEnumerable<Func<CancellationToken, Task<T>>> operations, int maxConcurrency, CancellationToken ct)
{
    using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
    var results = new ConcurrentBag<T>();
    var tasks = new List<Task>();
    foreach (var operation in operations)
    {
        await semaphore.WaitAsync(ct).ConfigureAwait(false);

        var task = Task.Factory.StartNew(async () =>
        {
            try
            {
                Debug.WriteLine($"Adding new result");
                var singleResult = await operation(ct).ConfigureAwait(false);
                results.Add(singleResult);
                Debug.WriteLine($"Added {singleResult}");
            }
            finally
            {
                semaphore.Release();
            }
        }, ct);
        tasks.Add(task);
    }
    await Task.WhenAll(tasks).ConfigureAwait(false);

    Debug.WriteLine($"Completed tasks: {tasks.Count(t => t.IsCompleted)}");
    Debug.WriteLine($"Calculated results: {results.Count}");
    
    return results.ToList().AsReadOnly();
}

Here's an example of how I use it:

var operations = Enumerable.Range(1, 10)
    .Select<int, Func<CancellationToken, Task<int>>>(n => async ct =>
    {
        await Task.Delay(100, ct);
        return n;
    });

var data = await _sut.Run(operations, 2, CancellationToken.None);

Every time I execute this, the data collection has just 8 results. I'd expect to have 10 results.

Here's the Debug log:

Adding new
Adding new
Added 1
Added 2
Adding new
Adding new
Added 3
Added 4
Adding new
Adding new
Added 5
Adding new
Added 6
Adding new
Added 7
Adding new
Added 8
Adding new
Completed tasks: 10
Calculated results: 8

As you can see:

  • 10 tasks are Completed
  • "Adding new" is logged 10 times
  • "Added x" is logged 8 times

I do not understand why the 2 last operations are not finished. All tasks have IsComplete set as true, which, as I understand, should mean that all of them got executed to an end.

mnj
  • 2,539
  • 3
  • 29
  • 58
  • 1
    The issue here, without me seeing exactly why yet, is that you're not really waiting for the tasks to properly complete at all. The whole illusion that you're waiting stems from the fact that the loop won't continue until it can grab the semaphore, and thus that part will wait until one of the tasks complete and release it. If you move the wait for the semaphore into the tasks that you start you will see that it doesn't wait at all for the tasks to complete. – Lasse V. Karlsen Feb 08 '21 at 09:44
  • @LasseV.Karlsen I have `await Task.WhenAll(tasks).ConfigureAwait(false);` below the foreach loop. I believe this should guarantee that all my operations are completed. Why doesn't that work? – mnj Feb 08 '21 at 09:46
  • Also, I may be mistaken but I believe the normal thread-related synchronization objects are not necessarily correct to use with tasks. Some of them will be owned by threads when they are grabbed and if the task running on that thread suddenly yields because of a pending I/O or other task completion, and that same thread tries to run another task it may be that it will re-use the lock on the synchronization object and allow more than the number of acquisitions you expected to happen. – Lasse V. Karlsen Feb 08 '21 at 09:46
  • I don't know yet why your code is not waiting for the tasks, all I'm saying is that the wait for the tasks is because of the semaphore acquisition by the main thread, waiting for a secondary task to release the previous hold on it. The wait is not because of WhenAll. I will have to debug further to figure out why. – Lasse V. Karlsen Feb 08 '21 at 09:47
  • Also, there is `Parallel.ForEach` that should work. Any particular reason you want to reimplement this feature yourself? – Lasse V. Karlsen Feb 08 '21 at 09:48
  • 1
    `Parallel.ForEach` was not designed for `async` operations as far as I understand. That would mean that I cannot `await` the code in `Parallel.ForEach`. – mnj Feb 08 '21 at 09:49
  • I believe you're right. – Lasse V. Karlsen Feb 08 '21 at 09:51
  • 4
    The issue here is that `StartNew(async()` returns a task that when awaited will return the inner task. You will have to do a double await on this, or use Unwrap. – Lasse V. Karlsen Feb 08 '21 at 09:54
  • 2
    Instead of the `Task.Factory.StartNew`, the `Task.Run` is preferable. It understands async delegates (no `Unwrap` needed), and is configured with a better default `TaskScheduler`. You can look [here](https://devblogs.microsoft.com/pfxteam/task-run-vs-task-factory-startnew/) for more details. Also instead of the `ConcurrentBag` you should consider using a `ConcurrentQueue`. It performs better, and preserves better the order of the enqueued items. The `ConcurrentBag` is a [very specialized](https://stackoverflow.com/a/64823123/11178549) collection. – Theodor Zoulias Feb 08 '21 at 10:38
  • @pinkfloydx33 the `TaskScheduler`s [are unable](https://stackoverflow.com/questions/15428604/how-to-run-a-task-on-a-custom-taskscheduler-using-await/57702536#57702536) to limit the concurrency of asynchronous operations. They can only limit the concurrency of synchronous code. – Theodor Zoulias Feb 08 '21 at 17:28
  • 1
    @TheodorZoulias you're absolutely right. I wasn't paying attention and missed the async/await – pinkfloydx33 Feb 08 '21 at 23:12

1 Answers1

6

The issue here is that Task.Factory.StartNew returns a task that when awaited returns the inner task.

It does not give you a task that will wait for this inner task, hence your problem.

The easiest way to fix this is to call Unwrap on the tasks you create, which will unwrap the inner task and allow you to wait for that.

This should work:

var task = ....
    ....
}, ct).Unwrap();

with this small change you get this output:

...
Added 9
Added 10
Completed tasks: 10
Calculated results: 10

Note that my comments on your question still stands:

  1. You're still working with the illusion that WhenAll will wait for all tasks, when in reality all tasks except the last N have already completed because the loop itself doesn't continue until the previous tasks have completed. You should thus move the synchronization object acquisition into your inner task so that you can queue them all up before you start waiting for them.
  2. I also believe (though I don't 100% know) that using SemaphoreSlim is not a good approach as I believe any thread-related synchronization objects might be unsafe to use in a task-related work. Threads in the threadpool are reused while live tasks are waiting for subtasks to complete which means such a thread might already own the synchronization object from a previous task that has yet to complete and thus allow more than those 2 you wanted to run to run at the "same time". SemaphoreSlim is OK to use, the other synchronization primitives might not be.
Lasse V. Karlsen
  • 380,855
  • 102
  • 628
  • 825
  • Great answer, thank you very much for your time. – mnj Feb 08 '21 at 10:01
  • 1
    @Loreno Please re-read as I edited in my comments. The small change with Unwrap might fix your immediate problem but I would advise you to change the SemaphoreSlim class to use AsyncSemaphore from AsyncEx instead. – Lasse V. Karlsen Feb 08 '21 at 10:04
  • Stephen noted in his repo that `SemaphoreSlim` is also good to use: https://github.com/StephenCleary/AsyncEx/blob/master/doc/AsyncSemaphore.md – mnj Feb 08 '21 at 10:20
  • @Loreno Ok, really good to know. I would still advise to move the acquisition line into the inner task though, but then you can safely stay with SemaphoreSlim. – Lasse V. Karlsen Feb 08 '21 at 10:24
  • 1
    The OP's approach is similar with Theo Yaung's solution from [this](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations/10810730#10810730) answer. IMHO it is a pretty good technique. It is more memory-efficient than awaiting the semaphore immediately by all workitems. As for the advice to switch from the `SemaphoreSlim` to the `Nito.AsyncEx.AsyncSemaphore`, I don't think that it'll offer any benefit. AFAIK Stephen Cleary made this class at a time when the `SemaphoreSlim` was not equipped with the `WaitAsync` method. – Theodor Zoulias Feb 08 '21 at 10:28
  • 1
    The difference between awaiting the semaphore during the enumeration (OP's and Theo Yaung's approach) and awaiting the semaphore inside the projected tasks (your suggestion) is that the first approach is lazy and the second is eager. The first approach is the "LINQy" one. Enumerating lazily can make a difference (in memory usage) if you have a million items to process. – Theodor Zoulias Feb 08 '21 at 18:46
  • 1
    @TheodorZoulias I agree, changed the answer. I was thinking of a different scenario where you might have initial init-code that could be allowed to run until it came to a pending I/O task, but it's a different scenario. – Lasse V. Karlsen Feb 09 '21 at 08:32