0

I am trying to improve performane of some code which does some shopping function calling number of different vendors. 3rd party vendor call is async and results are processed to generate a result. Strucure of the code is as follows.

public async Task<List<ShopResult>> DoShopping(IEnumerable<Vendor> vendors)
    {
        var res = vendors.Select(async s => await DoShopAndProcessResultAsync(s));
        await Task.WhenAll(res); ....
    }

Since DoShopAndProcessResultAsync is both IO bound and CPU bound, and each vendor iteration is independant I think Task.Run can be used to do something like below.

public async Task<List<ShopResult>> DoShopping(IEnumerable<Vendor> vendors)
    {
        var res = vendors.Select(s => Task.Run(() => DoShopAndProcessResultAsync(s)));
        await Task.WhenAll(res); ...
    }

Using Task.Run as is having a performance gain and I can see multiple threads are being involved here from the order of execution of the calls. And it is running without any issue locally on my machine. However, it is a tasks of tasks scenario and wondering whether any pitfalls or this is deadlock prone in a high traffic prod environment.

What are your opinions on the approach of using Task.Run to parallelize async calls?

H H
  • 263,252
  • 30
  • 330
  • 514
hasianjana
  • 737
  • 5
  • 4
  • 2
    In the first snippet, how about `var res = vendors.Select(DoShopAndProcessResultAsync);` to get a bunch of tasks, then use `await Task.WhenAll(res);`? – Lzh Jun 26 '22 at 18:33
  • I'm doing something similar using a `SemaphoreSlim` in this post: https://blog.elmah.io/bulk-download-from-azure-blob-storage-with-c/ – ThomasArdal Jun 27 '22 at 08:03
  • Parallelism isn't Concurrency isn't Asynchronous execution. .NET can already handle *concurrent* execution using high-level libraries like Dataflow blocks, or System.Threading.Channel+IAsyncEnumerable – Panagiotis Kanavos Jun 27 '22 at 08:33
  • @PanagiotisKanavos *"Parallelism isn't Concurrency isn't Asynchronous execution"* -- How is this relevant to the question asked? More specifically, does your comment suggest an improvement for the question asked? If yes, what exactly is this suggested improvement? – Theodor Zoulias Jun 27 '22 at 09:11
  • I know you know and that I've already explained that dozens of times in duplicate questions, but I'll repeat it for those that haven't seen the duplicates. Parallelism in .NET means specifically CPU-bound, in-memory data parallelism: processing lots of in-memory data using all available cores. Concurrency means executing lots of jobs, possibly related and often IO-bound, at the same time. The two different paradigms require different optimizations, which is why `Parallel.ForEach` is terrible for concurrent work – Panagiotis Kanavos Jun 27 '22 at 09:19
  • @PanagiotisKanavos I asked you how is your comment relevant to the question, and how it contributes in the improvement of the question. I didn't ask for your definition of parallelism. Which is a controversial definition btw, as you'll probably find out if you post it as an answer to a relevant question, so that people can evaluate (vote) it accordingly. – Theodor Zoulias Jun 27 '22 at 20:18

2 Answers2

1

What is alarming with the Task.Run approach in your question, is that it depletes the ThreadPool from available worker threads in a non-controlled manner. It doesn't offer any configuration option that would allow you to reduce the parallelism of each individual request, in favor of preserving the scalability of the whole service. That's something that might bite you in the long run.

Ideally you would like to control both the parallelism and the concurrency, and control them independently. For example you might want to limit the maximum concurrency of the I/O-bound work to 10, and the maximum parallelism of the CPU-bound work to 2. Regarding the former you could take a look at this question: How to limit the amount of concurrent async I/O operations?

Regarding the later, you could use a TaskScheduler with limited concurrency. The ConcurrentExclusiveSchedulerPair is a handy class for this purpose. Here is an example of how you could rewrite your DoShopping method in a way that limits the ThreadPool usage to two threads at maximum (per request), without limiting at all the concurrency of the I/O-bound work:

public async Task<ShopResult[]> DoShopping(IEnumerable<Vendor> vendors)
{
    var scheduler = new ConcurrentExclusiveSchedulerPair(
        TaskScheduler.Default, maxConcurrencyLevel: 2).ConcurrentScheduler;

    var tasks = vendors.Select(vendor =>
    {
        return Task.Factory.StartNew(() => DoShopAndProcessResultAsync(vendor),
            default, TaskCreationOptions.DenyChildAttach, scheduler).Unwrap();
    });

    return await Task.WhenAll(tasks);
}

Important: In order for this to work, the DoShopAndProcessResultAsync method should be implemented internally without .ConfigureAwait(false) at the await points. Otherwise the continuations after the await will not run on our preferred scheduler, and the goal of limiting the ThreadPool utilization will be defeated.

My personal preference though would be to use instead the new (.NET 6) Parallel.ForEachAsync API. Apart from making it easy to control the concurrency through the MaxDegreeOfParallelism option, it also comes with a better behavior in case of exceptions. Instead of launching invariably all the async operations, it stops launching new operations as soon as a previously launched operation has failed. This can make a big difference in the responsiveness of your service, in case for example that all individual async operations are failing with a timeout exception. You can find here a synopsis of the main differences between the Parallel.ForEachAsync and the Task.WhenAll APIs.

Unfortunately the Parallel.ForEachAsync has the disadvantage that it doesn't return the results of the async operations. Which means that you have to collect the results manually as a side-effect of each async operation. I've posted here a ForEachAsync variant that returns results, that combines the best aspects of the Parallel.ForEachAsync and the Task.WhenAll APIs. You could use it like this:

public async Task<ShopResult[]> DoShopping(IEnumerable<Vendor> vendors)
{
    var scheduler = new ConcurrentExclusiveSchedulerPair(
        TaskScheduler.Default, maxConcurrencyLevel: 2).ConcurrentScheduler;

    ParallelOptions options = new() { MaxDegreeOfParallelism = 10 };

    return await ForEachAsync(vendors, options, async (vendor, ct) =>
    {
        return await Task.Factory.StartNew(() => DoShopAndProcessResultAsync(vendor),
            ct, TaskCreationOptions.DenyChildAttach, scheduler).Unwrap();
    });
}

Note: In my initial answer (revision 1) I had suggested erroneously to pass the scheduler through the ParallelOptions.TaskScheduler property. I just found out that this doesn't work as I expected. The ParallelOptions class has an internal property EffectiveMaxConcurrencyLevel that represents the minimum of the MaxDegreeOfParallelism and the TaskScheduler.MaximumConcurrencyLevel. The implementation of the Parallel.ForEachAsync method uses this property, instead of reading directly the MaxDegreeOfParallelism. So the MaxDegreeOfParallelism, by being larger than the MaximumConcurrencyLevel, was effectively ignored.

You've probably also noticed by now that the names of these two settings are confusing. We use the MaximumConcurrencyLevel in order to control the number of threads (aka the parallelization), and we use the MaxDegreeOfParallelism in order to control the amount of concurrent async operations (aka the concurrency). The reason for this confusing terminology can be traced to the historic origins of these APIs. The ParallelOptions class was introduced before the async-await era, and the designers of the new Parallel.ForEachAsync API aimed at making it compatible with the older non-asynchronous members of the Parallel class.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • After thinking about it, I now believe that the behavior of the `Parallel.ForEachAsync` that invalidated my initial answer is a bug. I've posted a [bug report](https://github.com/dotnet/runtime/issues/71415 "Parallel.ForEachAsync the TaskScheduler and MaxDegreeOfParallelism options interfere with each other") on GitHub about it. – Theodor Zoulias Jun 29 '22 at 03:57
1

Tasks are .NET's low-level building blocks. .NET almost always has a better high-level abstraction for specific concurrency paradigms.


To paraphrase Rob Pike (slides) Concurrency is not parallelism is not asynchronous execution. What you ask is concurrent execution, with a specific degree-of-parallelism. NET already offers high-level classes that can do that, without resorting to low-level task handling.

At the end, I explain why these distinctions matter and how they're implemented using different .NET classes or libraries

Dataflow blocks

At the highest level, the Dataflow classes allow creating a pipeline of processing blocks similar to a Powershell or Bash pipeline, where each block can use one or more tasks to process input. Dataflow blocks preserve message order, ensuring results are emitted in the order the input messages were received.

You'll often see combinations of block called meshes, not pipelines. Dataflow grew out of the Microsoft Robotics Framework and can be used to create a network of independent processing blocks. Most programmers just use to build a pipeline of steps though.

In your case, you could use a TransformBlock to execute DoShopAndProcessResultAsync and feed the output either to another processing block, or a BufferBlock you can read after processing all results. You could even split Shop and Process into separate blocks, each with its own logic and degree of parallelism

Eg.

var buffer=new BufferBlock<ShopResult>();
var blockOptions=new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism=3, 
        BoundedCapacity=1 
};
var shop=new TransformBlock<Vendor,ShopResult)(DoShopAndProcessResultAsync,
                                               blockOptions);

var linkOptions=new DataflowLinkOptions{ PropagateCompletion=true;}
shop.LinkTo(buffer,linkOptions);

foreach(var v in vendors)
{
    await shop.SendAsync(v);
}

shop.Complete();
await shop.Completion;

buffer.TryReceiveAll(out IList<ShopResult> results);

You can use two separate blocks to shop and process :

var shop=new TransformBlock<Vendor,ShopResponse>(DoShopAsync,shopOptions);
var process=new TransformBlock<ShopResponse,ShopResult>(DoProcessAsync,processOptions);

shop.LinkTo(process,linkOptions);
process.LinkTo(results,linkOptions);

foreach(var v in vendors)
{
    await shop.SendAsync(v);
}

shop.Complete();
await process.Completion;

In this case we await the completion of the last block in the chain before reading the results.

Instead of reading from a buffer block, we could use an ActionBlock at the end to do whatever we want to do with the results, eg store them to a database. The results can be batched using a BatchBlock to reduce the number of storage operations

...
var batch=new BatchBlock<ShopResult>(100);
var store=new ActionBlock<ShopResult[]>(DoStoreAsync);

shop.LinkTo(process,linkOptions);
process.LinkTo(batch,linkOptions);
batch.LinkTo(store,linkOptions);

...
shop.Complete();
await store.Completion;

Why do names matter

Tasks are the lowest level building blocks used to implement multiple paradigms. In other languages you'd see them described as Futures or Promises (eg Javascript)

Parallelism in .NET means executing CPU-bound computations over a lot of data using all available cores. Parallel.ForEach will partition the input data into roughly as many partitions as there are cores and use one worker task per partition. PLINQ goes one step further, allowing the use of LINQ operators to specify the computation and let PLINQ to use algorithms optimized for parallel execution to map, filter, sort, group and collect results. That's why Parallel.ForEach can't be used for async work at all.

Concurrency means executing multiple independent and often IO-bound jobs. At the lowest level you can use Tasks but Dataflow, Rx.NET, Channels, IAsyncEnumerable etc allow the use of high-level patterns like CSP/Pipelines, event stream processing etc

Asynchronous execution means you don't have to block while waiting for I/O-bound work to complete.

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236