2

Below is sample console app and output is

enter image description here

Output is different each time and is fine but it needs to complete all tasks before I print result. It seems that Parallel.ForEachAsync is not waiting for all tasks to be completed. Am I missing anything here ?

internal class Program
{
    private async static Task Main(string[] args)
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();
        await TestParallel();
        sw.Stop();
        Console.WriteLine("Elapsed={0}", sw.Elapsed);
        Console.ReadLine();
    }

    private static async Task TestParallel()
    {
        var tests = new List<int>() { 1, 2, 3, 4, 5, 6 };
        var options = new ParallelOptions { MaxDegreeOfParallelism = 5,
            CancellationToken = CancellationToken.None };
        var responses = new List<string>();
        await Parallel.ForEachAsync(tests, options, async (testno, cancellationToken) =>
        {
            var response = await TestTask(testno);
            responses.Add(response);
        });
        foreach (var response in responses)
        {
            Console.WriteLine(response);
        }
    }
    private static Task<string> TestTask(int testno)
    {
        System.Threading.Thread.Sleep(1000);
        return Task.FromResult($"Test{testno}");
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
user3838575
  • 115
  • 1
  • 6
  • in terms of 'Output is different each time', that is expected behaviour. When you go parallel, you're not doing a series of tasks in a certain order, you are spinning them all up effectively at the same time and they'll each finish when they finish. Could be different every time. – Jonathan Jan 13 '22 at 19:42
  • 1
    In terms of 'it seems that Parallel.ForEachAsync is not waiting for all tasks to be completed', what makes you say this? – Jonathan Jan 13 '22 at 19:43
  • updated my question - Output is different each time and is fine but it needs to complete all tasks before I print result. It seems that Parallel.ForEachAsync is not waiting for all tasks to be completed. Am I missing anything here ? – user3838575 Jan 13 '22 at 19:45
  • 1
    is not waiting for all tasks to be completed because I am expecting it print all results (Order does not matter)Test1 Test2 Test3 Test4 Test5 Test6 – user3838575 Jan 13 '22 at 19:47
  • 3
    `List` is not thread safe. Maybe use `ConcurrentBag` and see if that helps. – John Wu Jan 13 '22 at 19:48
  • 1
    How come it waited 2 seconds then? 6 one second tasks with a MaxDOP of 5 would take 2 seconds for all to complete, no? – Caius Jard Jan 13 '22 at 19:50
  • 2
    Thank you @JohnWu, I changed var responses = new List(); to var responses = new ConcurrentBag(); and is working as per my expectation – user3838575 Jan 13 '22 at 19:51

3 Answers3

2

Answer for versions before .NET 6.


I think your example is a bit confusing. That's because you use an async callback. Mostly async is used for IO purposes.

Either go for: (this would be CPU-bound, doing some heavy calculations)

var responses = new List<string>();
var tests = new List<int>() { 1, 2, 3, 4 ,5,6};

Parallel.ForEach(tests, options, (testno) =>
{
    // no async here...
    var response = TestTask(testno);
    // lock the shared resource.
    lock(responses)
        responses.Add(response);
});

foreach (var response in responses)
{
    Console.WriteLine(response);
}

private static string TestTask(int testno)
{
    // calculations done here
    System.Threading.Thread.Sleep(1000);
    return $"Test{testno}";
}

Or go for: (this is IO-bound, for example getting content from external sources)

var tests = new List<int>() { 1, 2, 3, 4 ,5,6};

var tasks = new List<Task<string>>();

// just add the tasks to a list, so you can await them later.
// the first part (till the first await) will be completed synchronous. 
// If any async/await is used, the Task.WhenAll will wait for it. 
// Multiple tasks can be running simultaneously.
foreach(var t in tests)
    tasks.Add(TestTask(t));

await Task.WhenAll(tasks);

foreach (var task in tasks)
{
    // the current thread won't be blocked by calling the .Result here
    // All tasks are already completed.
    Console.WriteLine(task.Result);
}

private static async Task<string> TestTask(int testno)
{
    // Getting information from external resources.
    await Task.Delay(1000);
    return $"Test{testno}";
}

(there might be some typo's, haven't written in VS)

Jeroen van Langen
  • 21,446
  • 3
  • 42
  • 57
  • This code doesn't compile. The [`Parallel.ForEachAsync`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync) takes an asynchronous delegate (`Func`). If you are going to do synchronous work, you should probably use the `Parallel.ForEach` instead. – Theodor Zoulias Jan 13 '22 at 23:45
  • _(there might be some typo's, haven't written in VS)_ I'll change it. – Jeroen van Langen Jan 14 '22 at 08:01
  • 1
    Hmm, after some reading the `Parallel.ForEachAsync` is also used for IO-Bound. That means that my answer mainly applies to the old framework.. I see that `Parallel.ForEachAsync` is .NET 6 _(working in .NET 5 atm)_ – Jeroen van Langen Jan 14 '22 at 08:08
1

Answer is below - changed line var responses = new ConcurrentBag();

internal class Program
{
    private async static Task Main(string[] args)
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();
        await TestParallel();
        sw.Stop();
        Console.WriteLine("Elapsed={0}", sw.Elapsed);
        Console.ReadLine();
    }

    private static async Task TestParallel()
    {
        var tests = new List<int>() { 1, 2, 3, 4 ,5,6};
        var options = new ParallelOptions { MaxDegreeOfParallelism = 5, CancellationToken = CancellationToken.None };
        var responses = new ConcurrentBag<string>();
        await Parallel.ForEachAsync(tests, options, async (testno, cancellationToken) =>
        {
            var response = await TestTask(testno);
            responses.Add(response);
        });
        foreach (var response in responses)
        {
            Console.WriteLine(response);
        }
    }
    private static Task<string> TestTask(int testno)
    {
        System.Threading.Thread.Sleep(1000);
        return Task.FromResult($"Test{testno}");
    }
}
  • Nice answer. But instead of using the [highly specialized](https://stackoverflow.com/questions/15400133/when-to-use-blockingcollection-and-when-concurrentbag-instead-of-listt/64823123#64823123) `ConcurrentBag` class, I would strongly suggest to use the `ConcurrentQueue` class instead, which preserves the order of the added items. Even when order is not important, why choose a shuffling collection, when an order-preserving collection exists? If nothing else, it makes debugging more difficult. – Theodor Zoulias Jan 13 '22 at 23:50
  • 2
    @Theodor no, this is a terrible answer, because it doesn't even answer the question. I can't understand why it's upvoted; pretty sure it came along after John Wu's comment and doesn't even give as much info as that comment.. If it explained why using List in this context is a bad idea and is causing the results shown I could get on with it, but this looks like it's just taking someone else's inspiration (plagiarism) and posting it as a "try this" (no learning opportunity for the OP, "what do I do to make it work?" wasn't the question). Poor show all round – Caius Jard Jan 14 '22 at 01:41
  • 1
    @CaiusJard you are right. I was just trying to be nice. :-) This answer could be improved quite a lot. – Theodor Zoulias Jan 14 '22 at 02:06
-1

I tested your code. The code written gives different results and it seems Parallel.Async is not waiting. But real issue is List object. The List object is shared object and not thread safe.

If you use ConcurrentBag instead of List, then you get consistent result. I have change the code as below in Progam.cs and tested

using System.Collections.Concurrent; 
using System.Diagnostics;
Console.WriteLine("Hello, World!");
Stopwatch sw = new Stopwatch();   
sw.Start(); 
var tests = new  List<int>() { 1, 2, 3, 4, 5, 6 }; 
var options = new ParallelOptions {
   MaxDegreeOfParallelism = 2, CancellationToken = CancellationToken.None }; 
var responses = new ConcurrentBag<string>();

await Parallel.ForEachAsync(tests, options, async (testno, cancellationToken) => 
{
   var response = await TestTask(testno);
   responses.Add(response);
   Console.WriteLine("Executed... " + response); 
});
Console.WriteLine("Waiting.....");  

await Task.Delay(1000);

foreach(var response in responses) 
{
   Console.WriteLine("After Task Response:" + response); 
}

sw.Stop();
Console.WriteLine("Elapsed={0}", sw.Elapsed);
Console.ReadLine();

static Task<string> TestTask(int testno) 
{   System.Threading.Thread.Sleep(1000);
    return Task.FromResult($"Test{testno}"); 
}
  • The code is messy and lacks basic formatting. The answer would benefit from splitting the code into different sections clarifying each part. As it is right now, it is a long code read, with a short textual summary. – M.Nar Apr 20 '23 at 03:36