0

I'm trying to implement self-hosted web service using asp.net core 2.1 and got stuck with the problem of implementing background long-time execution tasks.

Due to the high CPU load and time consumption of each ProcessSingle method (in the code snippet below), I would like to limit the number of executing simultaneous tasks. But as I can see all tasks in Parallel.ForEachstart almost immediately, despite the fact that I set MaxDegreeOfParallelism = 3

My code is (it's a simplified version):

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Factory.StartNew(async () => {
        Parallel.ForEach(
            listOfData,
            new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
            async x => await ProcessSingle(x));
    });

    // return created id immediately
    return id;
}

public static async Task ProcessSingle(MyInputData inputData)
{
    var dbData = await GetDataFromDb(); // get data from DB async using Dapper
    // some lasting processing (sync)
    await SaveDataToDb(); // async save processed data to DB using Dapper
}

If I understand correctly, the problem is in async x => await ProcessSingle(x) inside Parallel.ForEach, isn't it?

Could someone describe please, how it should be implemented in the right way?

Update

Due to some kind of ambiguity in my question, it's necessary to focus on the main aspects:

  1. There are three parts in ProcessSingle method:

    • getting data from DB async

    • make long-time high CPU-loaded math calculations

    • save results to DB async

  2. The problem consists of two separate:

    • How to decrease CPU usage (by running not more than three math simultaneous calculations for example)?

    • How to keep the structure of the ProcessSingle method - keep them async because of async DB calls.

Hope it would be more clear now.

P.S. The suitable answer has been already given, it works (especially thanks to @MatrixTai). This update has been written for general clarification.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
user1820686
  • 2,008
  • 5
  • 25
  • 44
  • 2
    You don't need async/await inside your foreach loop. That causes further threads to start. I would just make ProcessSingle an ordinary, synchronous function. – PMF Sep 14 '18 at 06:33
  • 2
    **Don't** mix `Parallel.ForEach` with `async/await` _"[The whole idea behind Parallel.ForEach() is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async-await, where you want to release the thread for the duration of the async call](https://stackoverflow.com/a/11565317/585968)"_ –  Sep 14 '18 at 06:37
  • @PMF, @MickyD but the problem that I can't get rid of async calls to DB inside of `ProcessSingle`. So the only way is to stop using `Parallel.ForEach` and start using something like `Task.WhenAll(/* call all ProcessSingle */)` ? But in that case, there is no built-in way to limit concurrent threads (as I remember, I've seen some custom implementations)? – user1820686 Sep 14 '18 at 06:45
  • 2
    You would be better using _TPL DataFlow_. There not only can you use `async/await` but you can also _throttle_. ;) –  Sep 14 '18 at 06:47
  • *Don't* use `Parallel.Foreah` with async/await. It's meant for *data* parallelism - it's job is to *partition* the incoming data and assign each partition to a *single* task. What you ask though is already built into the `ActionBlock` class. – Panagiotis Kanavos Sep 14 '18 at 06:54
  • @user1820686 what is your *actual* problem? Do you have a slow query perhaps? What does the query look like? Are there any missing indexes? Executing queries in parallel will *cause* delays due to blocking. Trying to load N individual records will result in N times slower performance. Dapper can work with multiple values, `IN` clauses etc. – Panagiotis Kanavos Sep 14 '18 at 07:01
  • @user1820686 if you want to *insert* records, SqlBulkCopy can insert data using bulk mechanisms and minimal logging, just like the `BULK INSERT` command. Minimal logging means it can be *more* than N times faster than N individual, fully logged INSERT statements – Panagiotis Kanavos Sep 14 '18 at 07:03
  • @PanagiotisKanavos, no no, the problem isn't in slow queries to a database. All of them are pretty simple and fast. As I mentioned above my main problem was in long-time processing (it's sort of math calculations that could last several minutes for each `ProcessSingle`) not related to a database. – user1820686 Sep 14 '18 at 07:06
  • @user1820686 then you asked the wrong question and got a wrong answer. `ProcessSingle` shows IO calls, not long-time data processing. So it *is* an IO problem. Math is a *data parallelism* problem, for which Parallel.ForEach the correct mechanism. *Separate* those two types of computation and use the correct mechanism for each – Panagiotis Kanavos Sep 14 '18 at 07:12
  • @user1820686 and no, tasks in Parallel.ForEach don't all start at the same time. Only *three* tasks will be created. Parallel.ForEach will partition the data into 3 sets and feed each partition to a single task, to avoid wasting time for thread/task synchronization. Using `async/await` with ProcessSingle is actually a bug – Panagiotis Kanavos Sep 14 '18 at 07:18
  • @PanagiotisKanavos, yes, I understood that it's incorrect to use `async/await` inside of `Parallel.ForEach`. I've updated the question a bit to focus on the problem – user1820686 Sep 14 '18 at 07:43

2 Answers2

2

Update

As I just notice you mentioned in comment, the problem is caused by math calculation.

It will be better to separate the part of calculation and updating DB.

For the calculation part, use Parallel.ForEach() so as to optimize your work and you can control the thread number.

And only after all these tasks finished. Use async-await to update your data to DB without SemaphoreSlim I mentioned.

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Run(async () => {

        //Calculation Part
        ConcurrentBag<int> data = new ConcurrentBag<int>();
        Parallel.ForEach(
            listOfData,
            new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
            x => {ConcurrentBag.Add(calculationPart(x))});

        //Update DB part
        int[] data_arr = data.ToArray();
        List<Task> worker = new List<Task>();
        foreach (var i in data_arr)
        {
            worker.Add(DBPart(x));
        }
        await Task.WhenAll(worker);
    });

    // return created id immediately
    return id;
}

For sure they all start together as you use async-await in Parallel.forEach.

First, read about this question for both 1st and 2nd answer. Combining these two are meaningless.

Actually async-await will maximize the usage of available thread, so simply use it.

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Run(async () => {
        List<Task> worker = new List<Task>();
        foreach (var i in listOfData)
        {
            worker.Add(ProcessSingle(x));
        }
        await Task.WhenAll(worker);
    });

    // return created id immediately
    return id;
}

But then here is another problem, in this case those tasks still start all together, eating your CPU-usage.

So to avoid this, use SemaphoreSlim

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Run(async () => {
        List<Task> worker = new List<Task>();
        //To limit the number of Task started.
        var throttler = new SemaphoreSlim(initialCount: 20);
        foreach (var i in listOfData)
        {
            await throttler.WaitAsync();
            worker.Add(Task.Run(async () =>
            {
                await ProcessSingle(x);
                throttler.Release();
            }
            ));
        }
        await Task.WhenAll(worker);
    });

    // return created id immediately
    return id;
}

Read more How to limit the amount of concurrent async I/O operations?.

Also, do not use Task.Factory.StartNew() when simple Task.Run() is enough to do work you want, read this excellent article by Stephen Cleary.

MT-FreeHK
  • 2,462
  • 1
  • 13
  • 29
  • 2
    There's a built-in class for executing jobs in parallel with a DOP, it's ActionBlock. In any case, executing multiple slow DB queries in parallel is more likely to *reduce* performance – Panagiotis Kanavos Sep 14 '18 at 07:00
  • @PanagiotisKanavos, I agree that if the problem related to DB queries, it is more likely to reduce preformance. To do this task for myself, I will be more likely do it synchronously. – MT-FreeHK Sep 14 '18 at 07:04
  • @MatrixTai, no no, there are no problems with db communications at all :) You've answered exactly what I was looking for. Thanks) – user1820686 Sep 14 '18 at 07:08
  • @user1820686 , wait, I just notice you have written in comment that the problem is caused by maths calculation. Then, it is mostly CPU-Tasked. In such case, it may be better to use method PMF metioned, make it synchronous and dun by `Parallel.forEach`, and lastly collecting all data for `async-await` writing to DB. I will update my answer. – MT-FreeHK Sep 14 '18 at 07:11
  • 1
    @user1820686 what is your actual question? Parallel.ForEach with a DOP of 3 will run *ONLY 3 tasks at the same time*. It will split the data 3 ways and pass each partition to a *single* task. Your question's *bug* is that by making `ProcessSingle` async, you start *another* task for each individual data item – Panagiotis Kanavos Sep 14 '18 at 07:17
  • @PanagiotisKanavos , I think it is still meaningful to make `ProcessSingle` async, lets say `ProcessSingle` is purely DB tasks, so as to skip connection time. The problem thus becomes separating the work into CPU task and Connection task, separately handle them should be better. – MT-FreeHK Sep 14 '18 at 07:29
1

If you're more familiar with the "traditional" parallel processing concept, rewrite your ProcessSingle() method like this:

public static void ProcessSingle(MyInputData inputData)
{
    var dbData = GetDataFromDb(); // get data from DB async using Dapper
    // some lasting processing (sync)
    SaveDataToDb(); // async save processed data to DB using Dapper
}

Of course, you would then preferably also change the Work() method in a similar fashion.

PMF
  • 14,535
  • 3
  • 23
  • 49
  • Looks suspiciously like you will be mixing `Task` with `Parallel.ForEach` iteration –  Sep 14 '18 at 08:21
  • @MickyD: Yes, I will. That is not a problem, though. Mixing `Parallel.ForEach` and `async / await` is. `Parallel.ForEach` itself uses tasks. – PMF Sep 16 '18 at 11:31
  • It is a problem. Refer to the link above. –  Sep 16 '18 at 14:30
  • @MickyD: Sorry, which link? https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach/11565317#11565317 doesn't mention anything about mixing Tasks and Parallel.ForEach. It's all about mixing Tasks and async/await. I would not be using any async/await at all. – PMF Sep 16 '18 at 15:01
  • Incorrect. You've already indicated you will be using `Task`. The actual `await` is **irrelevant**. [Read this](https://blogs.msdn.microsoft.com/pfxteam/2009/05/26/does-parallel-for-use-one-task-per-iteration/). Additionally, even if you **purely** used `Parallel.ForEach`, to use a thread-pool thread with a blocking I/O task such as `GetDataFromDb()` and `SaveDataToDb()` is a waste of a perfectly good thread. You `need` to use IOCP which is nowhere in your example. `async/await` is perfect for I/O but you can't use that with `Parallel.ForEach`. –  Sep 16 '18 at 23:50
  • Now had your method be invoked from TPL DataFlow (as mentioned) it would be perfectly ok. –  Sep 16 '18 at 23:50
  • Depends on what you want to parallelize (or what not). If it's only the processing between the database calls, I agree. If you want to ensure there are at most n parallel executions of the _entire_ method taking place, it's perfectly ok to block a thread even if it does not run your own code at this time. – PMF Sep 17 '18 at 06:18