1

I have a list of id's and I want to get data for each of those id in parallel from database. My below ExecuteAsync method is called at very high throughput and for each request we have around 500 ids for which I need to extract data.

So I have got below code where I am looping around list of ids and making async calls for each of those id in parallel and it works fine.

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
    Func<CancellationToken, int, Task<T>> mapper) where T : class
{
    var tasks = new List<Task<T>>(ids.Count);
    // invoking multiple id in parallel to get data for each id from database
    for (int i = 0; i < ids.Count; i++)
    {
        tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
    }

    // wait for all id response to come back
    var responses = await Task.WhenAll(tasks);

    var excludeNull = new List<T>(ids.Count);
    for (int i = 0; i < responses.Length; i++)
    {
        var response = responses[i];
        if (response != null)
        {
            excludeNull.Add(response);
        }
    }
    return excludeNull;
}

private async Task<T> Execute<T>(IPollyPolicy policy,
    Func<CancellationToken, Task<T>> requestExecuter) where T : class
{
    var response = await policy.Policy.ExecuteAndCaptureAsync(
        ct => requestExecuter(ct), CancellationToken.None);
    if (response.Outcome == OutcomeType.Failure)
    {
        if (response.FinalException != null)
        {
            // log error
            throw response.FinalException;
        }
    }

    return response?.Result;
}

Question:

Now as you can see I am looping all ids and making bunch of async calls to database in parallel for each id which can put lot of load on database (depending on how many request is coming). So I want to limit the number of async calls we are making to database. I modified ExecuteAsync to use Semaphore as shown below but it doesn't look like it does what I want it to do:

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
    Func<CancellationToken, int, Task<T>> mapper) where T : class
{
    var throttler = new SemaphoreSlim(250);
    var tasks = new List<Task<T>>(ids.Count);
    // invoking multiple id in parallel to get data for each id from database
    for (int i = 0; i < ids.Count; i++)
    {
        await throttler.WaitAsync().ConfigureAwait(false);
        try
        {
            tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
        }
        finally
        {
            throttler.Release();
        }
    }

    // wait for all id response to come back
    var responses = await Task.WhenAll(tasks);

    // same excludeNull code check here

    return excludeNull;
}

Does Semaphore works on Threads or Tasks? Reading it here looks like Semaphore is for Threads and SemaphoreSlim is for tasks.

Is this correct? If yes then what's the best way to fix this and limit the number of async IO tasks we make to database here.

dragons
  • 549
  • 1
  • 8
  • 24
  • Related: [How to execute multiple async calls in parallel efficiently in C#?](https://stackoverflow.com/questions/61961349/how-to-execute-multiple-async-calls-in-parallel-efficiently-in-c/61964471) – Theodor Zoulias Jun 10 '20 at 08:04
  • 1
    As a side note, according to the [guidelines](https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap#naming-parameters-and-return-types) the asynchronous method `private async Task Execute(` should have the suffix `Async`. – Theodor Zoulias Jun 11 '20 at 13:40

4 Answers4

3

Task is an abstraction on threads, and doesn’t necessarily create a new thread. Semaphore limits the number of threads that can access that for loop. Execute returns a Task which aren’t threads. If there’s only 1 request, there will be only 1 thread inside that for loop, even if it is asking for 500 ids. The 1 thread sends off all the async IO tasks itself.

Sort of. I would not say that tasks are related to threads at all. There are actually two kinds of tasks: a delegate task (which is kind of an abstraction of a thread), and a promise task (which has nothing to do with threads).

Regarding the SemaphoreSlim, it does limit the concurrency of a block of code (not threads).

I recently started playing with C# so my understanding is not right looks like w.r.t Threads and Tasks.

I recommend reading my async intro and best practices. Follow up with There Is No Thread if you're interested more about how threads aren't really involved.

I modified ExecuteAsync to use Semaphore as shown below but it doesn't look like it does what I want it to do

The current code is only throttling the adding of the tasks to the list, which is only done one at a time anyway. What you want to do is throttle the execution itself:

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy, Func<CancellationToken, int, Task<T>> mapper) where T : class
{
  var throttler = new SemaphoreSlim(250);
  var tasks = new List<Task<T>>(ids.Count);

  // invoking multiple id in parallel to get data for each id from database
  for (int i = 0; i < ids.Count; i++)
    tasks.Add(ThrottledExecute(ids[i]));

  // wait for all id response to come back
  var responses = await Task.WhenAll(tasks);

  // same excludeNull code check here
  return excludeNull;

  async Task<T> ThrottledExecute(int id)
  {
    await throttler.WaitAsync().ConfigureAwait(false);
    try {
      return await Execute(policy, ct => mapper(ct, id)).ConfigureAwait(false);
    } finally {
      throttler.Release();
    }
  }
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Good detailed suggestion. I will go through and read all those links to get more clarity. Thanks for your help! Just making sure `async Task ThrottledExecute(int id)` line is part of same `ExecuteAsync` method? That is pretty new to me as well. Since I came from different language background so was confuse though. So now in your `ExecuteAsync` method suggestion, it is being throttled so max at a time it will make 250 calls to database for each `id` correct? – dragons Jun 10 '20 at 03:03
  • `ThrottledExecute` is a local method. So it's like a regular method but it can also access local variables. The code I posted will throttle the `Execute` methods so only 250 calls are made simultaneously. – Stephen Cleary Jun 10 '20 at 03:21
  • thanks Stephen. Quick question - reading [here](https://docs.microsoft.com/en-us/dotnet/standard/async-in-depth) it says that IO tasks won’t have a dedicated thread for them, cause it’s less efficient dedicating a thread to wait for an IO bound task so in this case also using semaphoreslim will do exactly what I want it to do meaning limiting number of async calls for each id to the database? In my case database is an IO ops. And `ExecuteAsync` method will be called at some throughput with list of ids and I want to limit number of `id` call we make to database from those `ids` asynchronously. – dragons Jun 11 '20 at 15:36
  • Yes, as noted in the answer below, you can think of `SemaphoreSlim` as limiting "asynchronous workflows". I.e., asynchronous code blocks (that do not require threads). – Stephen Cleary Jun 11 '20 at 16:42
  • @StephenCleary is that ExecuteAsync better than Parallel.ForEachAsync for IO operations? What is the difference? – Ssss Feb 24 '23 at 12:36
  • 1
    @Ssss: `ExecuteAsync` works with asynchronous code; `ForEachAsync` works with a mixture of asynchronous and synchronous code. I tend to use a `WhenAll`-based approach for asynchronous code, but I'm not opposed to using `ForEachAsync` in that scenario. – Stephen Cleary Feb 24 '23 at 17:52
2

Your colleague has probably in mind the Semaphore class, which is indeed a thread-centric throttler, with no asynchronous capabilities.

Limits the number of threads that can access a resource or pool of resources concurrently.

The SemaphoreSlim class is a lightweight alternative to Semaphore, which includes the asynchronous method WaitAsync, that makes all the difference in the world. The WaitAsync doesn't block a thread, it blocks an asynchronous workflow. Asynchronous workflows are cheap (usually less than 1000 bytes each). You can have millions of them "running" concurrently at any given moment. This is not the case with threads, because of the 1 MB of memory that each thread reserves for its stack.

As for the ExecuteAsync method, here is how you could refactor it by using the LINQ methods Select, Where, ToArray and ToList:


Update: The Polly library supports capturing and continuing on the current synchronization context, so I added a bool executeOnCurrentContext argument to the API. I also renamed the asynchronous Execute method to ExecuteAsync, to be in par with the guidelines.

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
    Func<CancellationToken, int, Task<T>> mapper,
    int concurrencyLevel = 1, bool executeOnCurrentContext = false) where T : class
{
    var throttler = new SemaphoreSlim(concurrencyLevel);
    Task<T>[] tasks = ids.Select(async id =>
    {
        await throttler.WaitAsync().ConfigureAwait(executeOnCurrentContext);
        try
        {
            return await ExecuteAsync(policy, ct => mapper(ct, id),
                executeOnCurrentContext).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    }).ToArray();

    T[] results = await Task.WhenAll(tasks).ConfigureAwait(false);

    return results.Where(r => r != null).ToList();
}

private async Task<T> ExecuteAsync<T>(IPollyPolicy policy,
    Func<CancellationToken, Task<T>> function,
    bool executeOnCurrentContext = false) where T : class
{
    var response = await policy.Policy.ExecuteAndCaptureAsync(
        ct => executeOnCurrentContext ? function(ct) : Task.Run(() => function(ct)),
        CancellationToken.None, continueOnCapturedContext: executeOnCurrentContext)
        .ConfigureAwait(executeOnCurrentContext);
    if (response.Outcome == OutcomeType.Failure)
    {
        if (response.FinalException != null)
        {
            ExceptionDispatchInfo.Throw(response.FinalException);
        }
    }
    return response?.Result;
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    @dragons I removed the `ConfigureAwait(false)` from my answer, because your intention may be to run the `mapper` lambda in the current synchronization context (to be able to access UI elements for example). If this is not required, then enclosing each async lambda into a `Task.Run` (`...Select(id => Task.Run(async () =>...`) is a safer way to ensure that the `mapper` will run in a background thread, instead of appending `ConfigureAwait(false)` after each `await`. – Theodor Zoulias Jun 11 '20 at 13:48
  • In my case there is another service calling our service for the data so in that case what is the recommendation? Should I have `ConfigureAwait(false)` or leave it out? – dragons Jun 11 '20 at 15:23
  • Also reading [here](https://learn.microsoft.com/en-us/dotnet/standard/async-in-depth) it says that IO tasks won’t have a dedicated thread for them, cause it’s less efficient dedicating a thread to wait for an IO bound task so in this case also using semaphoreslim will do exactly what I want it to do meaning limiting number of async calls for each id to the database? In my case database is an IO ops. – dragons Jun 11 '20 at 15:27
  • @dragons if you have no idea how and where the `ExecuteAsync` will be used, I would suggest to add an optional `bool executeOnCurrentContext = false` argument, and wrap the call with `Task.Run` if the argument is `false`. – Theodor Zoulias Jun 11 '20 at 15:44
  • As for your colleague, he may be confused about the relation between tasks and threads. This is not a trivial topic, and there is no substitute for studying it, and for experimenting with trivial examples in order to understand what async-await is and how it works. I would recommend as first reading the [primary resource](https://learn.microsoft.com/en-us/dotnet/csharp/programming-guide/concepts/async/) in learn.microsoft.com, and then the [introductory material](https://blog.stephencleary.com/2012/02/async-and-await.html) in Stephen Cleary's site. – Theodor Zoulias Jun 11 '20 at 15:53
  • 1
    @dragons I updated again my answer so that it uses the `continueOnCapturedContext` argument of the `Policy.ExecuteAndCaptureAsync` method. To be clear I have not tested this code, neither the previous versions I posted. – Theodor Zoulias Jun 12 '20 at 10:54
0

You are throttling the rate at which you add tasks to the list. You are not throttling the rate at which tasks are executed. To do that, you'd probably have to implement your semaphore calls inside the Execute method itself.

If you can't modify Execute, another way to do it is to poll for completed tasks, sort of like this:

for (int i = 0; i < ids.Count; i++)
{
    var pendingCount = tasks.Count( t => !t.IsCompleted );
    while (pendingCount >= 500) await Task.Yield();
    tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
await Task.WhenAll( tasks );
John Wu
  • 50,556
  • 8
  • 44
  • 80
  • I can modify `Execute` method as well. What will be the best way then to achieve this? Move it to `Execute` method? And is my colleague right what he mentioned about Semaphore works on Tasks vs Threads stuff? – dragons Jun 09 '20 at 17:53
  • Pass the `throttler` into the task. Within the task, await the throttler before executing the rest of the code, then release it when the rest of the code has finished. – John Wu Jun 09 '20 at 17:54
  • If I am not mistaken, the line `while (pendingCount >= 500) await Task.Yield();` will have no effect until `pendingCount >= 500` is evaluated to `true`, in which case an infinite loop will occur. – Theodor Zoulias Jun 10 '20 at 07:14
0

Actually the TPL is capable to control the task execution and limit the concurrency. You can test how many parallel tasks is suitable for your use-case. No need to think about threads, TPL will manage everything fine for you.

To use limited concurrency see this answer, credits to @panagiotis-kanavos

.Net TPL: Limited Concurrency Level Task scheduler with task priority?

The example code is (even using different priorities, you can strip that):

QueuedTaskScheduler qts = new QueuedTaskScheduler(TaskScheduler.Default,4);
TaskScheduler pri0 = qts.ActivateNewQueue(priority: 0);
TaskScheduler pri1 = qts.ActivateNewQueue(priority: 1);

Task.Factory.StartNew(()=>{ }, 
                  CancellationToken.None, 
                  TaskCreationOptions.None, 
                  pri0);

Just throw all your tasks to the queue and with Task.WhenAll you can wait till everything is done.

ZoolWay
  • 5,411
  • 6
  • 42
  • 76
  • AFAIK there is not way to throttle asynchronous operations by [using custom task schedulers](https://stackoverflow.com/questions/15428604/how-to-run-a-task-on-a-custom-taskscheduler-using-await). These can only be used to throttle the *continuations* of the async operations, not the operations themselves. – Theodor Zoulias Jun 10 '20 at 08:11
  • It depends on what one understands under "throttle". If you schedule 500 tasks, everyone with one or more SQL statement and only 2 or 3 or 10 of them run in parallel this has an impact. But it might not what OP is looking for nor might it be predictable. – ZoolWay Jun 10 '20 at 09:33
  • `TaskScheduler`s are only relevant with what Stephen Cleary calls [*delegate tasks*](https://blog.stephencleary.com/2015/03/a-tour-of-task-part-9-delegate-tasks.html). These are tasks that represent some action that runs on a thread. `TaskScheduler`s are not relevant with promise-style tasks. You can't throttle for example `Task.Delay()` tasks with a custom `TaskScheduler`, because these tasks are not running [anywhere](https://blog.stephencleary.com/2013/11/there-is-no-thread.html). And practically all built-in/third-party async methods return promise-style tasks. – Theodor Zoulias Jun 10 '20 at 09:54