1

I am trying to query a mongo-db parallely using Parallel.Foreach() but I am not getting any results. But when I try to run the same thing in regular foreach loop I am able to perform the expected tasks.

var exceptions = new ConcurrentQueue<Exception>();
var secondaryObjectsDictionaryCollection = new Dictionary<string, List<JObject>>();

// This works
foreach(var info in infos)
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
}

//This does not
Parallel.ForEach(infos, async info =>
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
});

I want to perform this task in parallel only since different mongodb collections are involved and also to reduce the response time.

I am not able to figure out what's getting wrong in my parallel loop. Any other approach to perform these tasks in parallel will also work.

Code-47
  • 413
  • 5
  • 17
  • 3
    Maybe removing all the `catch` statements would reveal an exception and helps you in solving? – Uwe Keim Nov 16 '18 at 06:16
  • @UweKeim doing that is giving me some expected exceptions from mongodb and after some time the application goes in break mode. – Code-47 Nov 16 '18 at 06:23

2 Answers2

6

Lets take a look at more simple example that illustrates same problems

You have code similar to this

var results = new Dictionary<int, int>();

Parallel.ForEach(Enumerable.Range(0, 5), async index =>
{
  var result = await DoAsyncJob(index);
  results.TryAdd(index, result);
});

Your code doesn't run because expression

async index => {...}

returns Task that is not awaited

like this

Parallel.ForEach(Enumerable.Range(0, 5), index => new Task());

By the way, when you work with multithreading like in your example you should use ConcurrentDictionary instead of Dictionary, when you make parallel updates to avoid errors and deadlocks

Best solution here not to use Parallel loop, but instead use Task.WhenAll

var tasks = Enumerable.Range(0, 5).Select(async index =>
{
  var result = await DoAsyncJob(index);
  results.TryAdd(index, result);
});

await Task.WhenAll(tasks);
Vadim Vnukov
  • 176
  • 4
4

Parallel.ForEach is not compatible with passing in a async method. If you want something similar to a Parallel.ForEach you can use Dataflow and it's ActionBlock.

var workerBlock = new ActionBlock<Info>(async info => 
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        //Note this is not thread safe and you need to put a lock around it.
        lock (secondaryObjectsDictionaryCollection) 
        {
            secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
        }
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
foreach(var info in infos)
{
    workerBlock.Post(info);
}
workerBlock.Complete();
Scott Chamberlain
  • 124,994
  • 33
  • 282
  • 431
  • could you post explanation why "Parallel.ForEach is not compatible with passing in a async method." , please – Z.R.T. Nov 16 '18 at 06:41
  • 1
    @Z.R.T. Sure, The function for the 2nd parameter is a `Action`, however for async to work properly it needs to be a `Func` because it can't tell when the async job is done otherwise (basically you are passing foreach a "async void" function). Because of that as soon as the first await is hit it thinks the work is complete for that thread. Dataflow has a overload that takes in a `Func` – Scott Chamberlain Nov 16 '18 at 06:44
  • @Z.R.T. .NET 6 has now added a `Parallel.ForEachAsync()` see https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync?view=net-6.0 – Dave Black Oct 21 '21 at 04:07