1

I want to add 1000+ tasks using Parallel.ForEach. The code below is for sending email notifications. The problem is that it works only for around 150 ~ 200 notifications & I receive the emails, but after that the code gets frozen & there is no email received.

Can someone please guide me in the right direction.

var exceptions = new ConcurrentQueue<Exception>();

try
{
    List<ParallelWorker_EmailNotification> workers = new List<ParallelWorker_EmailNotification>();

    foreach (Email mail in listEmails)
    {
        workers.Add(new ParallelWorker_EmailNotification(mail));
    }

    Parallel.ForEach(workers, async worker =>
    {
        try
        {
            await worker.SendNotification();
        }
        catch (Exception ex)
        {
            exceptions.Enqueue(ex);
        }
    });
}
catch (Exception ex)
{
    exceptions.Enqueue(ex);
}
Bender Bending
  • 729
  • 8
  • 25

1 Answers1

2

Parallel.ForEach does not work with async functions passed in, the method signature of async worker => is async void and that is likely the source of your problems. The Parallel.ForEach us unblocking because it thinks the work is done but the work is still processing in the background, this is why you don't see items processed.

The easiest solution (if SendNotification is a proper async function) is just select the items and put all the tasks to a IEnumerable and wait on them.

    var exceptions = new ConcurrentQueue<Exception>();

    try
    {
        var tasks = listEmails.Select(mail => new ParallelWorker_EmailNotification(mail))
                              .Select(async worker =>
            {
                try
                {
                    await worker.SendNotification();
                }
                catch (Exception ex)
                {
                    exceptions.Enqueue(ex);
                }
            });
        await Task.WhenAll(tasks);
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }

If SendNotification is a function that takes some time before it yields control back to the caller the best solution is use TPL Dataflow for the processing.

    var exceptions = new ConcurrentQueue<Exception>();

    try
    {
        var actionBlock = new ActionBlock<ParallelWorker_EmailNotification>(async worker =>
            {
                try
                {
                    await worker.SendNotification();
                }
                catch (Exception ex)
                {
                    exceptions.Enqueue(ex);
                }
            }, new ExecutionDataflowBlockOptions
                    {
                     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded                        
                    }
            );

        foreach (Email mail in listEmails)
        {
            actionBlock.Post(new ParallelWorker_EmailNotification(mail));
        }
        actionBlock.Complete();
        actionBlock.Completion.Wait();
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
Scott Chamberlain
  • 124,994
  • 33
  • 282
  • 431
  • TPL Dataflow example is good. Is there a way by which the code does not have to wait on " actionBlock.Completion.Wait();" and move to next batch but making sure previous batch does completes? – Application Manager Aug 02 '17 at 15:58
  • At some point you have to "wait and check" to see the work is done, where you want to put that check is up to you. If you want to return the Task from `actionBlock.Completion` you could let the caller call your function once per batch and once all the batches are done do a `Task.WaitAll(batchTasks)` or a `await Task.WhenAll(batchTasks)` on all the returned tasks. – Scott Chamberlain Aug 02 '17 at 16:01