2

I have a method that runs multiple async methods within it. I have to iterate over a list of devices, and pass the device to this method. I am noticing that this is taking a long time to complete so I am thinking of using Parallel.ForEach so it can run this process against multiple devices at the same time.

Let's say this is my method.

public async Task ProcessDevice(Device device) {
    var dev = await _deviceService.LookupDeviceIndbAsNoTracking(device);

    var result = await DoSomething(dev);
    await DoSomething2(dev);
}

Then DoSomething2 also calls an async method.

public async Task DoSomething2(Device dev) {
    foreach(var obj in dev.Objects) {
        await DoSomething3(obj);
    }
}

The list of devices continuously gets larger over time, so the more this list grows, the longer it takes the program to finish running ProcessDevice() against each device. I would like to process more than one device at a time. So I have been looking into using Parallel.ForEach.

Parallel.ForEach(devices, async device => {
    try {
        await ProcessDevice(device);
    } catch (Exception ex) {
        throw ex;
    }
})

It appears that the program is finishing before the device is fully processed. I have also tried creating a list of tasks, and then foreach device, add a new task running ProcessDevice to that list and then awaiting Task.WhenAll(listOfTasks);

var listOfTasks = new List<Task>();
foreach(var device in devices) {
    var task = Task.Run(async () => await ProcessDevice(device));
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

But it appears that the task is marked as completed before ProcessDevice() is actually finished running.

Please excuse my ignorance on this issue as I am new to parallel processing and not sure what is going on. What is happening to cause this behavior and is there any documentation that you could provide that could help me better understand what to do?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104

4 Answers4

7

You can't mix async with Parallel.ForEach. Since your underlying operation is asynchronous, you'd want to use asynchronous concurrency, not parallelism. Asynchronous concurrency is most easily expressed with WhenAll:

var listOfTasks = devices.Select(ProcessDevice).ToList();
await Task.WhenAll(listOfTasks);
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
1

In your last example there's a few problems:

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    await  Task.Run(async () => await ProcessDevice(device));
}
await Task.WhenAll(listOfTasks);

Doing await Task.Run(async () => await ProcessDevice(device)); means you are not moving to the next iteration of the foreach loop until the previous one is done. Essentially, you're still doing them one at a time.

Additionally, you aren't adding any tasks to listOfTasks so it remains empty and therefore Task.WhenAll(listOfTasks) completes instantly because there's no tasks to await.

Try this:

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    var task = Task.Run(async () => await ProcessDevice(device))
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);
WSC
  • 903
  • 10
  • 30
  • I don't know if it's a good idea to add all devices in a list of task to parallelize all at the same time. If there is too much, this will be slower than all others solutions. – Thibaut Jun 08 '20 at 14:51
  • It will depend greatly on what `ProcessDevice()` is really doing and how many devices there are but yes, this should be tested to see how it performs. An different but similar approach is to split `device` into batches and do, say, 5 batches in parallel do each batch synchronously. – WSC Jun 08 '20 at 14:56
  • @WSC - Sorry I updated my question, i did have listOfTasks.Add(task); but forgot it in the question. – Jaron Johnson Jun 08 '20 at 15:00
  • @JaronJohnson Your updated code doesn't make sense. You're not setting `task` to anything. Look at the code in this answer more carefully. – Gabriel Luci Jun 08 '20 at 15:03
  • @GabrielLuci I should have just copied and pasted my code instead if writing it out again in the question.. Updated it again... – Jaron Johnson Jun 08 '20 at 15:09
1

I can explain the problem with Parallel.ForEach. An important thing to understand is that when the await keyword acts on an incomplete Task, it returns. It will return its own incomplete Task if the method signature allows (if it's not void). Then it is up to the caller to use that Task object to wait for the job to finish.

But the second parameter in Parallel.ForEach is an Action<T>, which is a void method, which means no Task can be returned, which means the caller (Parallel.ForEach in this case) has no way to wait until the job has finished.

So in your case, as soon as it hits await ProcessDevice(device), it returns and nothing waits for it to finish so it starts the next iteration. By the time Parallel.ForEach is finished, all it has done is started all the tasks, but not waited for them.

So don't use Parallel.ForEach with asynchronous code.

Stephen's answer is more appropriate. You can also use WSC's answer, but that can be dangerous with larger lists. Creating hundreds or thousands of new threads all at once will not help your performance.

Gabriel Luci
  • 38,328
  • 4
  • 55
  • 84
  • Thank you for the explanation, that makes sense. – Jaron Johnson Jun 08 '20 at 15:12
  • Would using a SemaphoreSlim to limit the the level of concurrency, in combination with either of these answers help maintain a balance? Again, sorry for the ignorance, I am new to this subject. I really need to educate myself on the topic but not sure where to start. – Jaron Johnson Jun 08 '20 at 15:23
  • You *could*, but it might be over-complicating things for your situation. Is `ProcessDevice()` doing something processor-intensive? Or is it making I/O requests (reading files, making network requests, etc)? – Gabriel Luci Jun 08 '20 at 15:25
  • it is more I/O then processor-intensive. There are a few calculations that happens but it is pretty lite. it makes a few web requests and updates database records. – Jaron Johnson Jun 08 '20 at 15:32
  • Then Stephen's answer is the best way to do it. Once the I/O request is sent, `await` will return and the next one in the list will be started. So they all get started at once, then at `Task.WhenAll`, it will wait for them all to finish. It may even all happen on the same thread. – Gabriel Luci Jun 08 '20 at 15:36
  • Microsoft has a series of articles that explains asynchronous programming very well. This is the first one in that series: [Asynchronous programming with async and await](https://learn.microsoft.com/en-us/dotnet/csharp/programming-guide/concepts/async/). You will find the rest of that series in the table of contents on the left. – Gabriel Luci Jun 08 '20 at 15:37
  • @JaronJohnson for limiting the amount of concurrent async I/O operations take a look at [this](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations). – Theodor Zoulias Jun 08 '20 at 15:41
-1

not very sure it this if what you are asking for, but I can give example of how we start async process

 private readonly Func<Worker> _worker;

    private void StartWorkers(IEnumerable<Props> props){
    Parallel.ForEach(props, timestamp => { _worker.Invoke().Consume(timestamp); });
    }

Would recommend reading about Parallel.ForEach as it will do some part for you.

DevyDev
  • 846
  • 3
  • 14
  • 37