24

I have an enumeration of items (RunData.Demand), each representing some work involving calling an API over HTTP. It works great if I just foreach through it all and call the API during each iteration. However, each iteration takes a second or two so I'd like to run 2-3 threads and divide up the work between them. Here's what I'm doing:

ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads
var tasks = RunData.Demand
   .Select(service => Task.Run(async delegate
   {
      var availabilityResponse = await client.QueryAvailability(service);
      // Do some other stuff, not really important
   }));

await Task.WhenAll(tasks);

The client.QueryAvailability call basically calls an API using the HttpClient class:

public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request)
{
   var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request);

   if (response.IsSuccessStatusCode)
   {
      return await response.Content.ReadAsAsync<QueryAvailabilityResponse>();
   }

   throw new HttpException((int) response.StatusCode, response.ReasonPhrase);
}

This works great for a while, but eventually things start timing out. If I set the HttpClient Timeout to an hour, then I start getting weird internal server errors.

What I started doing was setting a Stopwatch within the QueryAvailability method to see what was going on.

What's happening is all 1200 items in RunData.Demand are being created at once and all 1200 await client.PostAsJsonAsync methods are being called. It appears it then uses the 2 threads to slowly check back on the tasks, so towards the end I have tasks that have been waiting for 9 or 10 minutes.

Here's the behavior I would like:

I'd like to create the 1,200 tasks, then run them 3-4 at a time as threads become available. I do not want to queue up 1,200 HTTP calls immediately.

Is there a good way to go about doing this?

Mike Christensen
  • 88,082
  • 50
  • 208
  • 326
  • You don't appear to create a new `client` for each call. You know that `System.Net.Http.HttpClient` is not thread-safe for instance calls? A new instance should be created for (and disposed after) each call. – Enigmativity Nov 25 '15 at 23:54
  • The `QueryAvailability` method is actually in a class that creates the `HttpClient`, which is a private member of that instance. I didn't know it was not thread-safe though, I could definitely create it before each call. I'll look into that more, thanks! – Mike Christensen Nov 26 '15 at 00:02
  • 1
    Hmm, I did a bit of research and it seems what I'm doing is thread safe. See [here](http://stackoverflow.com/questions/11178220/is-httpclient-safe-to-use-concurrently) and [here](http://www.tomdupont.net/2014/11/net-45-httpclient-is-thread-safe.html) – Mike Christensen Nov 26 '15 at 00:07

4 Answers4

23

As I always recommend.. what you need is TPL Dataflow (to install: Install-Package System.Threading.Tasks.Dataflow).

You create an ActionBlock with an action to perform on each item. Set MaxDegreeOfParallelism for throttling. Start posting into it and await its completion:

var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service => 
{
    var availabilityResponse = await client.QueryAvailability(service);
    // ...
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

foreach (var service in RunData.Demand)
{
    block.Post(service);
}

block.Complete();
await block.Completion;
BenV
  • 12,052
  • 13
  • 64
  • 92
i3arnon
  • 113,022
  • 33
  • 324
  • 344
6

Old question, but I would like to propose an alternative lightweight solution using the SemaphoreSlim class. Just reference System.Threading.

SemaphoreSlim sem = new SemaphoreSlim(4,4);

foreach (var service in RunData.Demand)
{

    await sem.WaitAsync();
    Task t = Task.Run(async () => 
    {
        var availabilityResponse = await client.QueryAvailability(serviceCopy));    
        // do your other stuff here with the result of QueryAvailability
    }
    t.ContinueWith(sem.Release());
}

The semaphore acts as a locking mechanism. You can only enter the semaphore by calling Wait (WaitAsync) which subtracts one from the count. Calling release adds one to the count.

SeanOB
  • 752
  • 5
  • 24
  • 1
    If I understand correctly, depending on how one is using this, might work fine with `sem.Wait()` rather than `await sem.WaitASync()`. Doing so would block the calling thread, so shouldn't be done on the UI thread, but on any other thread, it might be the easiest way to manage the work to be done. Specifically, if one of the 4 sems is available, it will proceed immediately. If not, it waits until one is available. – ToolmakerSteve Mar 04 '18 at 18:54
3

You're using async HTTP calls, so limiting the number of threads will not help (nor will ParallelOptions.MaxDegreeOfParallelism in Parallel.ForEach as one of the answers suggests). Even a single thread can initiate all requests and process the results as they arrive.

One way to solve it is to use TPL Dataflow.

Another nice solution is to divide the source IEnumerable into partitions and process items in each partition sequentially as described in this blog post:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}
Jakub Lortz
  • 14,616
  • 3
  • 25
  • 39
3

While the Dataflow library is great, I think it's a bit heavy when not using block composition. I would tend to use something like the extension method below.

Also, unlike the Partitioner method, this runs the async methods on the calling context - the caveat being that if your code is not truly async, or takes a 'fast path', then it will effectively run synchronously since no threads are explicitly created.

public static async Task RunParallelAsync<T>(this IEnumerable<T> items, Func<T, Task> asyncAction, int maxParallel)
{
    var tasks = new List<Task>();

    foreach (var item in items)
    {
        tasks.Add(asyncAction(item));

        if (tasks.Count < maxParallel)
                continue; 

        var notCompleted = tasks.Where(t => !t.IsCompleted).ToList();

        if (notCompleted.Count >= maxParallel)
            await Task.WhenAny(notCompleted);
    }

    await Task.WhenAll(tasks);
}
Andrew Hanlon
  • 7,271
  • 4
  • 33
  • 53
  • 1
    Creating unnecessary lists on every iteration of the foreach loop. A ManualResetEvent or a SempahoreSlim for throttling would involve much less allocations and better performance than counting the non completed task on every iteration. – Snak Sep 13 '19 at 13:34
  • 1
    Thanks @Snak - you are absolutely correct, and actually today I use a much different helper method without any specific task tracking. But reader be warned, a naive approach using a simple `SemaphoreSlim` is prone to deadlocks - it takes a lot of care to ensure that task exceptions propagate correctly for an early exit! – Andrew Hanlon Sep 13 '19 at 14:15