18

How do I turn the following into a Parallel.ForEach?

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    foreach (String url in threads)
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }
        progressBar1.PerformStep();
    }
}

I coded it in the assumption that asynchronous and parallel processing would be the same, and I just realized it isn't. I took a look at all the questions I could find on this, and I really can't seem to find an example that does it for me. Most of them lack readable variable names. Using single-letter variable names which don't explain what they contain is a horrible way to state an example.

I normally have between 300 and 2000 entries in the array named threads (Contains URL's to forum threads) and it would seem that parallel processing (Due to the many HTTP requests) would speed up the execution).

Do I have to remove all the asynchrony (I got nothing async outside the foreach, only variable definitions) before I can use Parallel.ForEach? How should I go about doing this? Can I do this without blocking the main thread?

I am using .NET 4.5 by the way.

svick
  • 236,525
  • 50
  • 385
  • 514
Steen Schütt
  • 1,355
  • 1
  • 17
  • 31

4 Answers4

16

I coded it in the assumption that asynchronous and parallel processing would be the same

Asynchronous processing and parallel processing are quite different. If you don't understand the difference, I think you should first read more about it (for example what is the relation between Asynchronous and parallel programming in c#?).

Now, what you want to do is actually not that simple, because you want to process a big collection asynchronously, with a specific degree of parallelism (8). With synchronous processing, you could use Parallel.ForEach() (along with ParallelOptions to configure the degree of parallelism), but there is no simple alternative that would work with async.

In your code, this is complicated by the fact that you expect everything to execute on the UI thread. (Though ideally, you shouldn't access the UI directly from your computation. Instead, you should use IProgress, which would mean the code no longer has to execute on the UI thread.)

Probably the best way to do this in .Net 4.5 is to use TPL Dataflow. Its ActionBlock does exactly what you want, but it can be quite verbose (because it's more flexible than what you need). So it makes sense to create a helper method:

public static Task AsyncParallelForEach<T>(
    IEnumerable<T> source, Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };
    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();
    return block.Completion;
}

In your case, you would use it like this:

await AsyncParallelForEach(
    threads, async url => await DownloadUrl(url), 8,
    TaskScheduler.FromCurrentSynchronizationContext());

Here, DownloadUrl() is an async Task method that processes a single URL (the body of your loop), 8 is the degree of parallelism (probably shouldn't be a literal constant in real code) and FromCurrentSynchronizationContext() makes sure the code executes on the UI thread.

Community
  • 1
  • 1
svick
  • 236,525
  • 50
  • 385
  • 514
  • In which namespace do I find `DataflowBlockOptions`, `ExecutionDataflowBlockOptions` and `ActionBlock`? I looked them up and MSDN says System.Threading.Tasks.Dataflow, but I can't use that one because it says the namespace does not exist – Steen Schütt Feb 03 '13 at 16:17
  • 1
    You need to [install it into your project](https://nuget.org/packages/Microsoft.Tpl.Dataflow). – Stephen Cleary Feb 03 '13 at 21:33
  • I did download it from the website, but it worked when I got it from the NuGet manager. Anyway, thanks for the help :) – Steen Schütt Feb 05 '13 at 18:13
  • I tried to adapt something that you did with great work from [here](https://scatteredcode.net/parallel-foreach-async-in-c/), but in two nested awaitable foreach with `IAsyncEnumerable` but i'm getting weired results. Am i missing something? Maybe i shouldn't use the same `TaskScheduler.FromCurrentSynchronizationContext()` in both awaitable `AsyncParallelForEach`?! What do you think? – Florin Vîrdol Mar 04 '21 at 12:45
10

Stephen Toub has a good blog post on implementing a ForEachAsync. Svick's answer is quite good for platforms on which Dataflow is available.

Here's an alternative, using the partitioner from the TPL:

public static Task ForEachAsync<T>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task> body)
{
  var partitions = Partitioner.Create(source).GetPartitions(degreeOfParallelism);
  var tasks = partitions.Select(async partition =>
  {
    using (partition) 
      while (partition.MoveNext()) 
        await body(partition.Current); 
  });
  return Task.WhenAll(tasks);
}

You can then use this as such:

public async Task getThreadContentsAsync(String[] threads)
{
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await threads.ForEachAsync(8, async url =>
  {
    HttpResponseMessage response = await client.GetAsync(url);
    String content = await response.Content.ReadAsStringAsync();
    String user;
    foreach (Match match in regex.Matches(content))
    {
      user = match.Groups[1].ToString();
      usernames.TryAdd(user, null);
    }
    progressBar1.PerformStep();
  });
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • It would be rally nice to have a Parallel.ForeachAsync feature in a .Net VNext. How are the chances. – rudimenter Dec 13 '13 at 15:45
  • @rudimenter: I'd suspect it's not high on the list. Most of the time, you either need to do parallel *or* asynchronous work. For those extremely rare situations when you have asynchronous work to be done in parallel, `Task.Run` with `Task.WhenAll` gives you a basic parallelism that works unless you need throttling. In that case, TPL Dataflow has throttling support built-in, and if your use case is really that complex, you should be using Dataflow anyway. So I don't see a good use case for `ForEachAsync` (i.e., usually when people ask for this, there's already a better alternative). – Stephen Cleary Dec 13 '13 at 16:47
  • I agree with you that that most of the work is either parallel or asynchronous but the thing with the new async feature is that its viral. Whole applications are now using async from the root to the branch. Iterating over async Tasks (parallel or sequential) will become very common. It would be nice to have something in BCL without pulling in an extra library like Dataflow and without custom throttle implementation. Thanks Anyway. – rudimenter Dec 13 '13 at 17:08
  • @rudimenter: My point was that you do have parallel and sequential iteration already: `Task.WhenAll` and `await`. The rare case is when you have a large number of tasks that are both asynchronous *and* CPU-bound, and you need to throttle them. Every other situation can be handled cleanly without using TPL Dataflow. – Stephen Cleary Dec 13 '13 at 18:00
  • Hi Stephen, is there a way to throttle async tasks that keeps the overall thread count below a threshold, but allows those threads to be released back to the 'internal' pool during an await? In other words allow more than DOP number of Tasks to be started as long as the number of active threads remains below DOP. The above ForEachAsync from Stephen Toub (while still releasing to the overall threadpool during an await) only allows the threshold number of Tasks to be started in parrallel. I imagine a custom scheduler would be needed. – Andrew Hanlon Oct 08 '14 at 18:58
  • 1
    @AndrewHanlon: Scheduling is the key, and the built-in `ConcurrentExclusiveSchedulerPair` should suffice. Just specify 8 as the max concurrency level, and use the `ConcurrentScheduler` property (ignore the other one). Remember you'll need to `Unwrap` asynchronous code executed by a task scheduler. – Stephen Cleary Oct 08 '14 at 19:17
  • Thank you kindly Stephen, hadn't seen the `ConcurrentExclusiveSchedulerPair` before - very interesting! I will read up on it now. Thanks again for all your help. P.S. Very happy with your book. – Andrew Hanlon Oct 08 '14 at 19:31
  • So for an interleaved version of the ForEachAsync above you could change the function body to something like: `var scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, dop).ConcurrentScheduler;` `return Task.WhenAll(source.Select(t => Task.Factory.StartNew(() => body(t), cancellationToken, TaskCreationOptions.None, scheduler).Unwrap()));` Very interesting, thanks for the info Stephen. – Andrew Hanlon Oct 08 '14 at 20:00
3

Yet another alternative is using SemaphoreSlim or AsyncSemaphore (which is included in my AsyncEx library and supports many more platforms than SemaphoreSlim):

public async Task getThreadContentsAsync(String[] threads)
{
  SemaphoreSlim semaphore = new SemaphoreSlim(8);
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await Task.WhenAll(threads.Select(async url =>
  {
    await semaphore.WaitAsync();
    try
    {
      HttpResponseMessage response = await client.GetAsync(url);
      String content = await response.Content.ReadAsStringAsync();
      String user;
      foreach (Match match in regex.Matches(content))
      {
        user = match.Groups[1].ToString();
        usernames.TryAdd(user, null);
      }
      progressBar1.PerformStep();
    }
    finally
    {
      semaphore.Release();
    }
  }));
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
0

You can try the ParallelForEachAsync extension method from AsyncEnumerator NuGet Package:

using System.Collections.Async;

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    await threads.ParallelForEachAsync(async url =>
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }

        // THIS CALL MUST BE THREAD-SAFE!
        progressBar1.PerformStep();
    },
    maxDegreeOfParallelism: 8);
}
Serge Semenov
  • 9,232
  • 3
  • 23
  • 24