149
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Here is the problem, it starts 1000+ simultaneous web requests. Is there an easy way to limit the concurrent amount of these async http requests? So that no more than 20 web pages are downloaded at any given time. How to do it in the most efficient manner?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Grief Coder
  • 6,508
  • 9
  • 38
  • 51
  • 3
    How is this different from [your previous question](http://stackoverflow.com/questions/10801328/how-to-properly-run-multiple-async-tasks-in-parallel)? – svick May 29 '12 at 21:40
  • 1
    http://stackoverflow.com/questions/9290498/how-can-i-limit-parallel-foreach With a ParallelOptions parameter. – Chris Disley May 29 '12 at 21:46
  • 4
    @ChrisDisley, this will only parallelize the launching of the requests. – spender May 29 '12 at 21:48
  • @svick is right, how is it different? btw, I love the answer there http://stackoverflow.com/a/10802883/66372 – eglasius Jan 30 '14 at 08:23
  • 3
    Besides `HttpClient` is `IDisposable`, and you should dispose it, especially when you're going to use 1000+ of them. `HttpClient` can be used as a singleton for multiple requests. – Shimmy Weitzhandler Aug 19 '15 at 06:21
  • 3
    @Shimmy you should never dispose `HttpClient`: https://stackoverflow.com/a/15708633/1246870 – avs099 Jul 26 '18 at 20:21
  • as google gives this as the first result for similar problem in java, see this: https://stackoverflow.com/a/69234939/1220560 – morgwai Sep 19 '21 at 02:53
  • ​As a side note, the `HttpClient` class is intended to be instantiated [once](https://learn.microsoft.com/en-us/aspnet/web-api/overview/advanced/calling-a-web-api-from-a-net-client#create-and-initialize-httpclient), and reused throughout the life of an application. – Theodor Zoulias Oct 03 '22 at 09:23

12 Answers12

216

You can definitely do this in the latest versions of async for .NET, using .NET 4.5 Beta. The previous post from 'usr' points to a good article written by Stephen Toub, but the less announced news is that the async semaphore actually made it into the Beta release of .NET 4.5

If you look at our beloved SemaphoreSlim class (which you should be using since it's more performant than the original Semaphore), it now boasts the WaitAsync(...) series of overloads, with all of the expected arguments - timeout intervals, cancellation tokens, all of your usual scheduling friends :)

Stephen's also written a more recent blog post about the new .NET 4.5 goodies that came out with beta see What’s New for Parallelism in .NET 4.5 Beta.

Last, here's some sample code about how to use SemaphoreSlim for async method throttling:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Last, but probably a worthy mention is a solution that uses TPL-based scheduling. You can create delegate-bound tasks on the TPL that have not yet been started, and allow for a custom task scheduler to limit the concurrency. In fact, there's an MSDN sample for it here:

See also TaskScheduler .

Simon_Weaver
  • 140,023
  • 84
  • 646
  • 689
Theo Yaung
  • 3,994
  • 1
  • 18
  • 14
  • Doesn't this code end up creating a list containing as many task objects as there are urls? is there anyway to avoid this? – GreyCloud Mar 19 '13 at 11:54
  • 3
    isn't a parallel.foreach with a limited degree of parallelism a nicer approach? http://msdn.microsoft.com/en-us/library/system.threading.tasks.paralleloptions.maxdegreeofparallelism.aspx – GreyCloud Mar 19 '13 at 11:59
  • 1
    Please note, that `WaitAsync` will implicitly increase the internal counter. I've ran into an issue when not staring a task for every, but some of the elements in the source collection. Make sure you only call `WaitAsync` when you're scheduling a task. – GameScripting Nov 06 '13 at 15:28
  • 2
    Why don't you dispose you `HttpClient` – Shimmy Weitzhandler Aug 19 '15 at 06:52
  • 7
    @GreyCloud: `Parallel.ForEach` works with synchronous code. This allows you to call asynchronous code. – Josh Noe Jun 06 '16 at 02:46
  • 1
    @Shimmy, although HttpClient technically inherits from IDisposable, it's not actually doing anything. There is actually no benefit to disposing HttpClient whatsoever. – TheMonarch Aug 01 '17 at 23:29
  • 2
    @TheMonarch [you're wrong](https://source.dot.net/#System.Net.Http/System/Net/Http/HttpClient.cs,556). Besides it's always a good habit to wrap all `IDisposable`s in `using` or `try-finally` statements, and assure their disposal. – Shimmy Weitzhandler Aug 02 '17 at 04:33
  • 62
    Given how popular this answer is, it's worth pointing out that HttpClient can and should be a single common instance rather than an instance per request. – Rupert Rawnsley Oct 04 '17 at 14:20
  • 2
    @RupertRawnsley +1, and of course there is a proof for that on our beloved SO: https://stackoverflow.com/a/15708633/1246870 – avs099 Jul 18 '18 at 20:45
  • 1
    What is the benefit of using Task.Run here? I know it's usually used to not block the UI thread, but in here it's hard for me to understand the difference between adding this without Task.Run since, when running this, it seems to be doing the same type of thing. – Dinerdo Nov 11 '18 at 19:43
  • @Dinerdo I was wondering the same thing. As far as I understand, you should only use Task.Run when you have a CPU-intensive task. It seems like you should simply await these calls, so I think you're right. However I'd like someone to triple check my logic. – Slothario May 02 '19 at 19:11
  • 5
    `Task.Run()` is necessary here because if you await normally then the requests will be processed one at a time (since it's waiting for the request to finish before continuing the rest of the loop) instead of in parallel. However, if you don't await the request then you will release the semaphore as soon as the task is scheduled (allowing all requests to run at the same time), which defeats the purpose of using it in the first place. The context created by Task.Run is just a place to hold onto the semaphore resource. – Nick Aug 12 '19 at 18:02
  • 1
    `Task.Run()` is needed here because the code doesn't create all Tasks at once, but rather schedules 20 active tasks at once. After the loop has completed, there can be 0 to 20 tasks still active (created / running / waiting to run). – Bouke Dec 09 '19 at 09:34
  • 2
    @Dinerdo there is hardly any benefit by using `Task.Run` here, but there is hardly any harm using it either (because the `Task.Run` method understands async delegates). The alternative would be to use a [local function](https://learn.microsoft.com/en-us/dotnet/csharp/programming-guide/classes-and-structs/local-functions) that accepts a `url` and returns a `Task`, but local functions were not available at the time this answer was written (C# 7 was released at March 2017). – Theodor Zoulias Feb 26 '20 at 06:10
  • would the task.run be necessary if it was an async lambda in `urls.select()`? – Chris DaMour Jul 02 '20 at 17:36
26

If you have an IEnumerable (ie. strings of URL s) and you want to do an I/O bound operation with each of these (ie. make an async http request) concurrently AND optionally you also want to set the maximum number of concurrent I/O requests in real time, here is how you can do that. This way you do not use thread pool et al, the method uses semaphoreslim to control max concurrent I/O requests similar to a sliding window pattern one request completes, leaves the semaphore and the next one gets in.

usage:

await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }
Liam
  • 27,717
  • 28
  • 128
  • 190
Dogu Arslan
  • 3,292
  • 24
  • 43
  • [Do I need to Dispose a SimaphoreSlim?](https://stackoverflow.com/questions/32033416/do-i-need-to-dispose-a-semaphoreslim) – AgentFire Oct 21 '17 at 12:13
  • no you should not need to explicitly dispose SemaphoreSlim in this implementation and usage as it is used internally inside the method and the method does not access its AvailableWaitHandle property in which case we would have needed to either dispose or wrap it within a using block. – Dogu Arslan Oct 21 '17 at 19:53
  • 3
    Just thinking of the best practices and lessons we teach other people. A `using` would be nice. – AgentFire Oct 21 '17 at 20:43
  • well this example i can follow, but trying work out what is the best way to do this, basically have a throttler but my Func would return a list, which i ultimately want in a final list of all completed when done...which may require locked on list, do you have suggestions. – Seabizkit Apr 26 '20 at 09:53
  • 1
    you can slightly update the method so it returns the list of actual tasks and you await Task.WhenAll from inside your calling code. Once Task.WhenAll is complete, you can enumerate over each task in the list and add its list to the final list. Change method signature to 'public static IEnumerable> ForEachAsync( IEnumerable inputEnumerable, Func> asyncProcessor, int? maxDegreeOfParallelism = null)' – Dogu Arslan Apr 26 '20 at 21:00
  • Two remarks about the `ForEachAsync` method: 1. The `asyncProcessor` delegate is invoked invariably for all inputs, even in case an invocation has failed. On the contrary the .NET 6 `Parallel.ForEachAsync` completes ASAP in case of an error, which arguably is a more reasonable/desirable behavior. 2. This implementation essentially creates a number of workers equal to the number of items, and all workers are waiting to acquire asynchronously the same semaphore. It allocates more memory than the .NET 6 `Parallel.ForEachAsync`. For huge source sequences, the memory-overhead might be significant. – Theodor Zoulias Mar 06 '23 at 16:20
17

After the release of the .NET 6 (in November, 2021), and for all applications except from ASP.NET, the recommended way of limiting the amount of concurrent asynchronous I/O operations is the Parallel.ForEachAsync API, with the MaxDegreeOfParallelism configuration. Here is how it can be used in practice:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", /*...*/ };
var client = new HttpClient();
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };

// now let's send HTTP requests to each of these URLs in parallel
await Parallel.ForEachAsync(urls, options, async (url, cancellationToken) =>
{
    var html = await client.GetStringAsync(url, cancellationToken);
});

In the above example the Parallel.ForEachAsync task is awaited asynchronously. You can also Wait it synchronously if you need to, which will block the current thread until the completion of all asynchronous operations. The synchronous Wait has the advantage that in case of errors, all exceptions will be propagated. On the contrary the await operator propagates by design only the first exception. In case this is a problem, you can find solutions here.


Note about ASP.NET: The Parallel.ForEachAsync API works by launching many workers (tasks) on the ThreadPool, and all the workers are invoking the body delegate in parallel. This goes against the advice offered in the MSDN article Async Programming : Introduction to Async/Await on ASP.NET:

You can kick off some background work by awaiting Task.Run, but there’s no point in doing so. In fact, that will actually hurt your scalability by interfering with the ASP.NET thread pool heuristics. If you have CPU-bound work to do on ASP.NET, your best bet is to just execute it directly on the request thread. As a general rule, don’t queue work to the thread pool on ASP.NET.

So using the Parallel.ForEachAsync in an ASP.NET application could harm the scalability of the application. In ASP.NET applications concurrency is OK, but parallelism should be avoided.

From the currently submitted answers, only Dogu Arslan's answer is suitable for ASP.NET applications, although it doesn't have ideal behavior in case of exceptions (in case of an error the Task might not complete fast enough).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
11

There are a lot of pitfalls and direct use of a semaphore can be tricky in error cases, so I would suggest to use AsyncEnumerator NuGet Package instead of re-inventing the wheel:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);
0xced
  • 25,219
  • 10
  • 103
  • 255
Serge Semenov
  • 9,232
  • 3
  • 23
  • 24
  • 9
    As noted in prior posts you should not be creating new HttpClients in any kind of loop unless you actually enjoy socket exhaustion issues in production. – CajunCoding Feb 15 '21 at 19:48
7

Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in.

Look at the AsyncSemaphore class built by the most respectable Stephen Toub. What you want is called a semaphore, and you need an async version of it.

Pang
  • 9,564
  • 146
  • 81
  • 122
usr
  • 168,620
  • 35
  • 240
  • 369
  • 15
    Note that "Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in." is no longer correct as of .NET 4.5 Beta. SemaphoreSlim now offers WaitAsync(...) functionality :) – Theo Yaung May 30 '12 at 06:03
  • Should SemaphoreSlim (with its new async methods) be preferred over AsyncSemphore, or does Toub's implementation still have some advantage? – Todd Menier Apr 18 '13 at 17:53
  • In my opinion, the built-in type should be preferred because it is likely to be well-tested and well-designed. – usr Apr 18 '13 at 18:02
  • 6
    Stephen added a comment in response to a question on his blog post confirming that using SemaphoreSlim for .NET 4.5 would generally be the way to go. – jdasilva Jun 15 '13 at 20:52
3

The SemaphoreSlim can be very helpful here. Here's the extension method I've created:

/// <summary>Concurrently Executes async actions for each item of
/// <see cref="IEnumerable<typeparamref name="T"/></summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of
/// <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the
/// maximum degree of parallelism, Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel
/// is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
    this IEnumerable<T> enumerable,
    Func<T, Task> action,
    int? maxDegreeOfParallelism = null)
{
    if (maxDegreeOfParallelism.HasValue)
    {
        using (var semaphoreSlim = new SemaphoreSlim(
            maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
        {
            var tasksWithThrottler = new List<Task>();

            foreach (var item in enumerable)
            {
                // Increment the number of currently running tasks and wait if they
                // are more than limit.
                await semaphoreSlim.WaitAsync();

                tasksWithThrottler.Add(Task.Run(async () =>
                {
                    await action(item).ContinueWith(res =>
                    {
                        // action is completed, so decrement the number of
                        // currently running tasks
                        semaphoreSlim.Release();
                    }, TaskScheduler.Default);
                }));
            }

            // Wait for all tasks to complete.
            await Task.WhenAll(tasksWithThrottler.ToArray());
        }
    }
    else
    {
        await Task.WhenAll(enumerable.Select(item => action(item)));
    }
}
    

Sample usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Jay Shah
  • 3,553
  • 1
  • 27
  • 26
  • Is there still nothing built into the framework that does this? – Simon_Weaver Apr 07 '21 at 19:21
  • Did you ever make a `SelectAsyncConcurrent` version of this? – Simon_Weaver Apr 07 '21 at 19:26
  • @Simon_Weaver I don't think framework has any built-in mechanism for this as of now. – Jay Shah Apr 19 '21 at 22:35
  • 1
    @Simon_Weaver No, I have not built SelectAsyncConcurrent version, but that would be an interesting implementation. – Jay Shah Apr 19 '21 at 22:37
  • 1
    I just made a very clumsy one that simply calls ForEachAsyncConcurrent. I only needed it in one place so it was fine. I just created a `ConcurrentStack` and added items to it inside a call to your function. The ordering wasn't important for me, but if anyone else attempts it don't use a List because a) it's not thread safe and b) the results may not come back in the same order anyway. – Simon_Weaver Apr 20 '21 at 05:32
  • The same answer has been posted [here](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach/50263098#50263098). – Theodor Zoulias Mar 21 '23 at 04:25
0

Although 1000 tasks might be queued very quickly, the Parallel Tasks library can only handle concurrent tasks equal to the amount of CPU cores in the machine. That means that if you have a four-core machine, only 4 tasks will be executing at a given time (unless you lower the MaxDegreeOfParallelism).

scottm
  • 27,829
  • 22
  • 107
  • 159
  • 10
    Yep, but that doesn't relate to async I/O operations. The code above will fire up 1000+ simultaneous downloads even if it is running on a single thread. – Grief Coder May 29 '12 at 21:36
  • Didn't see the `await` keyword in there. Removing that should solve the problem, correct? – scottm May 29 '12 at 21:37
  • 2
    The library certainly can handle more tasks running (with the `Running` status) concurrently than the amount of cores. This will be especially the case with a I/O bound Tasks. – svick May 29 '12 at 21:42
  • @svick: yep. Do you know how to efficiently control the max concurrent TPL tasks (not threads)? – Grief Coder May 29 '12 at 21:48
0

In newer versions of .NET (Core 1.0 or higher), you can use the built in TPL Dataflow.

using System.Threading.Tasks.Dataflow;

var client = new HttpClient();

var block = new TransformBlock<string, string>(
    client.GetStringAsync,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);
foreach (string url in urls) {
    block.Post(url);
}
block.Complete();

string[] htmls = await block.ReceiveAllAsync().ToArrayAsync();

Assuming you actually need the received contents, and with Dataflow you can do far more complex jobs than this.

Note that you need to install System.Linq.Async package for ToArrayAsync.


As mentioned in comments, ReceiveAllAsync is possibly hazardous if GetStringAsync fails. In this case, if you want to stop the pipeline and propagate exceptions if any exception happens, do not use ReceiveAllAsync:

var htmls = new List<string>();
while (await block.OutputAvailableAsync())
{
    while (block.TryReceive(out string result))
    {
        htmls.Add(result);
    }
}
await block.Completion; // This propagates exceptions

Or if you want to proceed, but record all the exceptions:

var block = new TransformBlock<string, (string? html, Exception? exception)>(
    async url =>
    {
        try
        {
            return (await client.GetStringAsync(url), null);
        }
        catch (Exception e)
        {
            return (null, e);
        }
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);

(string? html, Exception? exception)[] results =
    await block.ReceiveAllAsync().ToArrayAsync();
RcINS
  • 61
  • 4
  • There are a few problems with this. All URLs are posted in the block upfront, so it's not suitable for huge (iterator-generated) input sequences. `OperationCanceledException`s thrown by the `GetStringAsync` are ignored [by design](https://github.com/dotnet/runtime/issues/29619 "Dataflow TransformBlock silently fails if TaskCanceledException is thrown"). All exceptions are ignored because of [a bug](https://github.com/dotnet/runtime/issues/79535 "ReceiveAllAsync, ReadAllAsync and propagation of errors") in the `ReceiveAllAsync`. For these reasons I don't agree with the *"you should"*. – Theodor Zoulias Apr 05 '23 at 06:55
  • Now it's better, but I still don't agree with the *"you should"*, because it implies that the TPL Dataflow is clearly the best solution for this problem, by a wide margin. IMHO the [`Parallel.ForEachAsync`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync) API is at least equal, if not better as a solution. – Theodor Zoulias Apr 11 '23 at 10:01
  • @TheodorZoulias I don't agree `Parallel.ForEachAsync` is an equivalent. You need a thread-safe container (say ConcurrentBag) to store the results, and this does not preserve order. PLINQ (`urls.AsParallel().WithDegreeOfParallelism(8).Select...`) is more of a equivalent but it does block. Anyway, to clarify the answer, I change the phrasing to "you can". – RcINS Apr 12 '23 at 03:23
  • Sure, the `Parallel.ForEachAsync` doesn't collect the results, but the question makes no mention about results. The OP just wants to know how to limit the concurrency of asynchronous I/O operations. A question that might be more relevant to your answer is this: [ForEachAsync with Result](https://stackoverflow.com/questions/30907650/foreachasync-with-result). – Theodor Zoulias Apr 12 '23 at 08:20
-1

this is not good practice as it changes a global variable. it is also not a general solution for async. but it is easy for all instances of HttpClient, if that's all you're after. you can simply try:

System.Net.ServicePointManager.DefaultConnectionLimit = 20;
symbiont
  • 1,428
  • 3
  • 21
  • 27
-1

Here is a handy Extension Method you can create to wrap a list of tasks such that they will be executed with a maximum degree of concurrency:

/// <summary>Allows to do any async operation in bulk while limiting the system to a number of concurrent items being processed.</summary>
private static IEnumerable<Task<T>> WithMaxConcurrency<T>(this IEnumerable<Task<T>> tasks, int maxParallelism)
{
    SemaphoreSlim maxOperations = new SemaphoreSlim(maxParallelism);
    // The original tasks get wrapped in a new task that must first await a semaphore before the original task is called.
    return tasks.Select(task => maxOperations.WaitAsync().ContinueWith(_ =>
    {
        try { return task; }
        finally { maxOperations.Release(); }
    }).Unwrap());
}

Now instead of:

await Task.WhenAll(someTasks);

You can go

await Task.WhenAll(someTasks.WithMaxConcurrency(20));
Alain
  • 26,663
  • 20
  • 114
  • 184
-2

Parallel computations should be used for speeding up CPU-bound operations. Here we are talking about I/O bound operations. Your implementation should be purely async, unless you're overwhelming the busy single core on your multi-core CPU.

EDIT I like the suggestion made by usr to use an "async semaphore" here.

Community
  • 1
  • 1
GregC
  • 7,737
  • 2
  • 53
  • 67
  • Good point! Though each task here will contain async and sync code (page downloaded asynchronously then processed in sync manner). I am trying to distribute the sync portion of the code accross CPUs and at the same time limit the amount of concurrent async I/O operations. – Grief Coder May 29 '12 at 21:39
  • Why? Because launching 1000+ http requests simultaneously might not be a task well suited to the user's network capacity. – spender May 29 '12 at 21:44
  • Parallel extensions can also be used as a way to multiplex I/O operations without having to manually implement a pure async solution. Which I agree could be considered sloppy, but as long as you keep a tight limit on the number of concurrent operations it probably won't strain the threadpool too much. – Sean U May 29 '12 at 21:48
  • **Don't run long running/blocking operations in the ThreadPool**. @SeanU Your suggestion is bad practice and can cause many unintended and nasty side-effects. – spender May 29 '12 at 21:50
  • 3
    I don't think this answer is providing an answer. Being purely async is not enough here: We really want to throttle the physical IOs in a non-blocking manner. – usr May 29 '12 at 21:50
  • @spender Aside from consuming the entire pool if you don't keep a limit on how many threads you consume, what other unintended or nasty side-effects are there I should be worried about? – Sean U May 29 '12 at 22:02
  • Well, in ideal circumstances, the "entire pool" should really only represent the # processors in the system. Anything larger represents a strained ThreadPool. Because the ThreadPool is reluctant to spin up extra threads and will only do so under sustained stress, other operations that rely on a fluid ThreadPool will now be affected by this implicit latency. For instance: System.Threading.Timer fires its callbacks on the ThreadPool. Now, with ony a few long-lived tasks in the ThreadPool, they're not coming in on time. – spender May 29 '12 at 22:12
  • How bad can that really get? For example, is it going to introduce worse delays than a collection of generation 2? I ask because I learned the idiom from Microsoft sample code on how to use TPL, which would seem to imply that it's not the worst practice in the world. – Sean U May 29 '12 at 22:43
  • It can get pretty bad. http://stackoverflow.com/questions/10781853/timer-more-reliable-than-system-threading-timer – spender May 29 '12 at 23:45
  • Thread pool starvation is rather more extreme than what you originally describe. And was addressed in my original comment. – Sean U May 30 '12 at 00:49
  • 1
    Hmm.. not sure I agree... when working on a large project, if one too many developers takes this view, you'll get starvation even though each developer's contribution in isolation is not enough to tip things over the edge. Given that there is only *one* ThreadPool, even if you're treating it semi-respectfully... if everyone else is doing the same, trouble can follow. As such I *always* advise against running long stuff in the ThreadPool. – spender May 30 '12 at 15:15
-2

Essentially you're going to want to create an Action or Task for each URL that you want to hit, put them in a List, and then process that list, limiting the number that can be processed in parallel.

My blog post shows how to do this both with Tasks and with Actions, and provides a sample project you can download and run to see both in action.

With Actions

If using Actions, you can use the built-in .Net Parallel.Invoke function. Here we limit it to running at most 20 threads in parallel.

var listOfActions = new List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

With Tasks

With Tasks there is no built-in function. However, you can use the one that I provide on my blog.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

And then creating your list of Tasks and calling the function to have them run, with say a maximum of 20 simultaneous at a time, you could do this:

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
deadlydog
  • 22,611
  • 14
  • 112
  • 118
  • I think you are just specifying initialCount for SemaphoreSlim and you need to specify 2nd parameter i.e. maxCount in the constructor of SemaphoreSlim. – Jay Shah May 09 '18 at 13:17
  • I want each response from each task processed into a List. How can I get return Result or response – venkat Jan 21 '20 at 16:21