-2

I have an ASP.NET 5 Web API application which contains a method that takes objects from a List<T> and makes HTTP requests to a server, 5 at a time, until all requests have completed. This is accomplished using a SemaphoreSlim, a List<Task>(), and awaiting on Task.WhenAll(), similar to the example snippet below:

public async Task<ResponseObj[]> DoStuff(List<Input> inputData)
{
  const int maxDegreeOfParallelism = 5;
  var tasks = new List<Task<ResponseObj>>();

  using var throttler = new SemaphoreSlim(maxDegreeOfParallelism);
  foreach (var input in inputData)
  {
    tasks.Add(ExecHttpRequestAsync(input, throttler));
  }

  List<ResponseObj> resposnes = await Task.WhenAll(tasks).ConfigureAwait(false);

  return responses;
}

private async Task<ResponseObj> ExecHttpRequestAsync(Input input, SemaphoreSlim throttler)
{
  await throttler.WaitAsync().ConfigureAwait(false);
  
  try
  {
    using var request = new HttpRequestMessage(HttpMethod.Post, "https://foo.bar/api");
    request.Content = new StringContent(JsonConvert.SerializeObject(input, Encoding.UTF8, "application/json");

    var response = await HttpClientWrapper.SendAsync(request).ConfigureAwait(false);
    var responseBody = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
    var responseObject = JsonConvert.DeserializeObject<ResponseObj>(responseBody);

    return responseObject;
  }
  finally
  {
    throttler.Release();
  }
}

This works well, however I am looking to limit the total number of Tasks that are being executed in parallel globally throughout the application, so as to allow scaling up of this application. For example, if 50 requests to my API came in at the same time, this would start at most 250 tasks running parallel. If I wanted to limit the total number of Tasks that are being executed at any given time to say 100, is it possible to accomplish this? Perhaps via a Queue<T>? Would the framework automatically prevent too many tasks from being executed? Or am I approaching this problem in the wrong way, and would I instead need to Queue the incoming requests to my application?

mfcallahan
  • 165
  • 4
  • 17
  • 2
    A question first: why are you wrapping a task based I/O operation in a call to `Task.Run`? It takes a thread from the threadpool for no reason. It terms of scalability that is a bad thing. See [this still relevant blogpost](https://blog.stephencleary.com/2013/10/taskrun-etiquette-and-proper-usage.html). Get rid of it and your api will already be much more scalable. – Peter Bons Feb 26 '21 at 19:12
  • 1
    Second question: why do you want to limit this? Is it due to the external service that you are calling, can't it handle many requests or is it rate limited? If not, then the question of why remains. – Peter Bons Feb 26 '21 at 19:15
  • Let's say that 100 requests are currently processed, using 100 x 1 = 100 tasks in total (100 is the total tasks limit). What should happen with the next 101st request? Should it wait until one of the 100 previous requests completes? – Theodor Zoulias Feb 26 '21 at 19:46
  • @PeterBons I think that you are overstating the effect of the `Task.Run` in this code. The `Task.Run(async` takes a thread from the `ThreadPool` only until the first `await` of an incomplete awaitable occurs inside the async lambda. Which should happen pretty fast. Also the current thread will be returned to the `ThreadPool` when it hits the `await Task.WhenAll(tasks)`, or some other `await` that is currently missing from the code. It could still cause the saturation of the `ThreadPool` for small time-spans, but I would not expect it to have an impactful effect in scalability. – Theodor Zoulias Feb 26 '21 at 20:01
  • @PeterBons Thanks for the link. I see now how my code as currently written is not ideal; I am simply waiting for the responses of the HTTP requests I am making, not executing anything which is CPU-intensive. If I'm understanding correctly, I can remove the call to `Task.Run()` and simple create a `new Task()`, passing in the same Action delegate. To answer your second question, yes, the external service I am calling is a vendor product we have running on prem, but it does not handle many requests very well, thus the need to handle the rate limiting in my application. – mfcallahan Feb 26 '21 at 20:17
  • @TheodorZoulias thanks for pointing this out, I did forget to include that line in my sample code and have updated the original question. – mfcallahan Feb 26 '21 at 20:18
  • Matthew you are releasing the `throttler` at the wrong scope. Currently the throttler is doing no throttling at all, because it is released immediately after creating a task and adding it to the list. You should release the throttler when the task completes. – Theodor Zoulias Feb 26 '21 at 20:24
  • @TheodorZoulias regarding your first question, the answer is yes - if 100 Tasks is the max limit, the 101st Task would wait to begin until one of the first 100 Tasks has completed so that at any given time there would be no more than 100 Tasks in progress. Regarding the `throttler` from your last comment, my sample code was incorrect, I have updated the original question to correctly reflect where I am calling `SemaphoreSlim.Release()` – mfcallahan Feb 26 '21 at 20:55
  • Another possibility for enforcing the 100-tasks-max policy would be to allow the 101st request to be processed concurrently with the others, by sharing a pool of 100 available task-slots. This way at any given moment one request would be idle, and the other 100 would be progressing. This is actually easier to implement than putting the 101st request in a waiting list. You'd just need to add another `SemaphoreSlim` in the mix. Are you sure that you prefer the "101st should wait" policy? – Theodor Zoulias Feb 26 '21 at 21:13
  • 1
    Matthew the `Task` constructor creates a cold task, that will never complete unless you `Start` it first. This is not what you want. Instead of `Task.Run` you could use this simple method to start your tasks synchronously on the current thread: `static Task Run(Func action) => action();`. Or, for tasks with return value, this: `static Task Run(Func> action) => action();`. – Theodor Zoulias Feb 26 '21 at 21:40
  • Thanks for all the comments - I've updated the code snippet above with my new solution. My app now creates a `List>`, loops thru the `inputData` param, and will `await Task.WhenAll()` to return a `Task` containing all the response objects from all the HTTP requests being made. – mfcallahan Mar 01 '21 at 20:12

3 Answers3

3

I'm going to assume the code is fixed, i.e., Task.Run is removed and the WaitAsync / Release are adjusted to throttle the HTTP calls instead of List<T>.Add.

I am looking to limit the total number of Tasks that are being executed in parallel globally throughout the application, so as to allow scaling up of this application.

This does not make sense to me. Limiting your tasks limits your scaling up.

For example, if 50 requests to my API came in at the same time, this would start at most 250 tasks running parallel.

Concurrently, sure, but not in parallel. It's important to note that these aren't 250 threads, and that they're not 250 CPU-bound operations waiting for free thread pool threads to run on, either. These are Promise Tasks, not Delegate Tasks, so they don't "run" on a thread at all. It's just 250 objects in memory.

If I wanted to limit the total number of Tasks that are being executed at any given time to say 100, is it possible to accomplish this?

Since (these kinds of) tasks are just in-memory objects, there should be no need to limit them, any more than you would need to limit the number of strings or List<T>s. Apply throttling where you do need it; e.g., number of HTTP calls done simultaneously per request. Or per host.

Would the framework automatically prevent too many tasks from being executed?

The framework has nothing like this built-in.

Perhaps via a Queue? Or am I approaching this problem in the wrong way, and would I instead need to Queue the incoming requests to my application?

There's already a queue of requests. It's handled by IIS (or whatever your host is). If your server gets too busy (or gets busy very suddenly), the requests will queue up without you having to do anything.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Thank you Stephen for addressing each of my questions in this answer. Once again, your knowledge and insight into asynchronous programming in .NET once again proves invaluable. I do see now that I was approaching the problem incorrectly, and limiting the Tasks in my application limits scaling up. – mfcallahan Feb 27 '21 at 13:50
  • Hi Stephen, after some extensive reading on many of your blog posts on the topic of Tasks and async in C#, I made some changes in my app which I believe follow best practices for I/O bound work my app is doing. Would you mind taking a quick look at the updated code sample above and letting me know if anything could be improved? – mfcallahan Mar 01 '21 at 19:52
  • 2
    @mfcallahan: I prefer having `WaitAsync` and `Release` in the same method, but the way you have it would work and it looks ok to me. If you want more code review, [there's a site for that](https://codereview.stackexchange.com/). – Stephen Cleary Mar 01 '21 at 21:00
1

If I wanted to limit the total number of Tasks that are being executed at any given time to say 100, is it possible to accomplish this?

What you are looking for is to limit the MaximumConcurrencyLevel of what's called the Task Scheduler. You can create your own task scheduler that regulates the MaximumCongruencyLevel of the tasks it manages. I would recommend implementing a queue-like object that tracks incoming requests and currently working requests and waits for the current requests to finish before consuming more. The below information may still be relevant.

The task scheduler is in charge of how Tasks are prioritized, and in charge of tracking the tasks and ensuring that their work is completed, at least eventually.

The way it does this is actually very similar to what you mentioned, in general the way the Task Scheduler handles tasks is in a FIFO (First in first out) model very similar to how a ConcurrentQueue<T> works (at least starting in .NET 4).

Would the framework automatically prevent too many tasks from being executed?

By default the TaskScheduler that is created with most applications appears to default to a MaximumConcurrencyLevel of int.MaxValue. So theoretically yes.

The fact that there practically is no limit to the amount of tasks(at least with the default TaskScheduler) might not be that big of a deal for your case scenario.

Tasks are separated into two types, at least when it comes to how they are assigned to the available thread pools. They're separated into Local and Global queues.

Without going too far into detail, the way it works is if a task creates other tasks, those new tasks are part of the parent tasks queue (a local queue). Tasks spawned by a parent task are limited to the parent's thread pool.(Unless the task scheduler takes it upon itself to move queues around)

If a task isn't created by another task, it's a top-level task and is placed into the Global Queue. These would normally be assigned their own thread(if available) and if one isn't available it's treated in a FIFO model, as mentioned above, until it's work can be completed.

This is important because although you can limit the amount of concurrency that happens with the TaskScheduler, it may not necessarily be important - if for say you have a top-level task that's marked as long running and is in-charge of processing your incoming requests. This would be helpful since all the tasks spawned by this top-level task will be part of that task's local queue and therefor won't spam all your available threads in your thread pool.

DekuDesu
  • 2,224
  • 1
  • 5
  • 19
  • 1
    Take a look at this: [How to run a Task on a custom TaskScheduler using await?](https://stackoverflow.com/questions/15428604/how-to-run-a-task-on-a-custom-taskscheduler-using-await). The `TaskScheduler`s are incompatible with async/await. They can only schedule delegate-based tasks, not promise-style tasks. Async methods produce promise-style tasks, which are decomposed by the async state machine to a series of smaller alternant delegate-based and promise-style tasks. In sort a `TaskScheduler` cannot view a task produced by an async method as a single unit. So it cannot throttle such tasks. – Theodor Zoulias Feb 26 '21 at 20:14
1

When you have a bunch of items and you want to process them asynchronously and with limited concurrency, the SemaphoreSlim is a great tool for this job. There are two ways that it can be used. One way is to create all the tasks immediately and have each task acquire the semaphore before doing it's main work, and the other is to throttle the creation of the tasks while the source is enumerated. The first technique is eager, and so it consumes more RAM, but it's more maintainable because it is easier to understand and implement. The second technique is lazy, and it's more efficient if you have millions of items to process.

The technique that you have used in your sample code is the second (lazy) one.

Here is an example of using two SemaphoreSlims in order to impose two maximum concurrency policies, one per request and one globally. First the eager approach:

private const int maxConcurrencyGlobal = 100;
private static SemaphoreSlim globalThrottler
    = new SemaphoreSlim(maxConcurrencyGlobal, maxConcurrencyGlobal);

public async Task<ResponseObj[]> DoStuffAsync(IEnumerable<Input> inputData)
{
    const int maxConcurrencyPerRequest = 5;
    var perRequestThrottler
        = new SemaphoreSlim(maxConcurrencyPerRequest, maxConcurrencyPerRequest);

    Task<ResponseObj>[] tasks = inputData.Select(async input =>
    {
        await perRequestThrottler.WaitAsync();
        try
        {
            await globalThrottler.WaitAsync();
            try
            {
                return await ExecHttpRequestAsync(input);
            }
            finally { globalThrottler.Release(); }
        }
        finally { perRequestThrottler.Release(); }
    }).ToArray();
    return await Task.WhenAll(tasks);
}

The Select LINQ operator provides an easy and intuitive way to project items to tasks.

And here is the lazy approach for doing exactly the same thing:

private const int maxConcurrencyGlobal = 100;
private static SemaphoreSlim globalThrottler
    = new SemaphoreSlim(maxConcurrencyGlobal, maxConcurrencyGlobal);

public async Task<ResponseObj[]> DoStuffAsync(IEnumerable<Input> inputData)
{
    const int maxConcurrencyPerRequest = 5;
    var perRequestThrottler
        = new SemaphoreSlim(maxConcurrencyPerRequest, maxConcurrencyPerRequest);

    var tasks = new List<Task<ResponseObj>>();
    foreach (var input in inputData)
    {
        await perRequestThrottler.WaitAsync();
        await globalThrottler.WaitAsync();
        Task<ResponseObj> task = Run(async () =>
        {
            try
            {
                return await ExecHttpRequestAsync(input);
            }
            finally
            {
                try { globalThrottler.Release(); }
                finally { perRequestThrottler.Release(); }
            }
        });
        tasks.Add(task);
    }
    return await Task.WhenAll(tasks);

    static async Task<T> Run<T>(Func<Task<T>> action) => await action();
}

This implementation assumes that the await globalThrottler.WaitAsync() will never throw, which is a given according to the documentation. This will no longer be the case if you decide later to add support for cancellation, and you pass a CancellationToken to the method. In that case you would need one more try/finally wrapper around the task-creation logic. The first (eager) approach could be enhanced with cancellation support without such considerations. Its existing try/finally infrastructure is already sufficient.

It is also important that the internal helper Run method is implemented with async/await. Eliding the async/await would be an easy mistake to make, because in that case any exception thrown synchronously by the ExecHttpRequestAsync method would be rethrown immediately, and it would not be encapsulated in a Task<ResponseObj>. Then the task returned by the DoStuffAsync method would fail without releasing the acquired semaphores, and also without awaiting the completion of the already started operations. That's another argument for preferring the eager approach. The lazy approach has too many gotchas to watch for.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104