0

I have a function that collects in a Dictionary values like this:

void Func()
{
    ....
    var dtDict = await HandleComputeBooster();
    ...
}

async private static Task
DoBooster(..., ConcurrentDict<string, DataTable> dtDict,....)
{
    DataTable dt = ...
    ...
    dtDict[symbol] = dt;
    ...
}

This is the function that is returning too soon:

async private Task<ConcurrentDictionary<string, DataTable>> 
HandleComputeBooster()
{
    var dtDict = new ConcurrentDictionary<string, DataTable>();

    ....
    var chunks = listOfBoosterSymbols.ChunkBy(8);
    var pcCount = Environment.ProcessorCount;

    Parallel.ForEach(chunks, new ParallelOptions
    { MaxDegreeOfParallelism = pcCount - 2 }, async listStr =>
        {
        var symbol = listStr[0];
        await DoBooster(..., dtDict, ...);
        }
    );
    ...

    return dtDict;
}

the problem is that HandleComputeBooster returns before all the values in dtDict are computed. All the values eventually make into dtDict, but I need a way to say, don't return from HandleComputeBooster until all the chunks have been processed?

Ivan
  • 7,448
  • 14
  • 69
  • 134
  • 3
    What version of dotnet are you using? 6+ has [Parallel.ForEachAsync](https://learn.microsoft.com/en-gb/dotnet/api/system.threading.tasks.parallel.foreachasync?view=net-6.0) – Fermin Jan 27 '23 at 15:07
  • 1
    Another option is to fire them all in a normal loop, collect all the resulting tasks into a list, and then await all of them via `await Task.WhenAll(tasks)`. The parallel trigger doesn't do anything for you here, at least with your minimal example. – Brannon Jan 27 '23 at 15:16
  • If `DoBooster` loads data from a database, executing N queries can easily be slower than executing one query that loads N symbols. What does `DoBooster` do? – Panagiotis Kanavos Jan 27 '23 at 15:23

1 Answers1

0

You can't use async methods in Parallel.ForEach. Use Parallel.ForEachAsync instead :

await Parallel.ForEachAsync(chunks, new (){ MaxDegreeOfParallelism = pcCount - 2 }, 
    async listStr =>
    {
        var symbol = listStr[0];
        await DoBooster(..., dtDict, ...);
    });

Parallel.ForEach doesn't have an overload that accepts a Funct<...,Task>. The lambda in the question's code is actually an async void delegate that can't be awaited:

async void listStr()
{
        var symbol = listStr[0];
        await DoBooster(..., dtDict, ...);
}

The question's code fires off as many tasks as there are items, two at a time, and never waits for them to finish.

If DoBooster performs IO there may not be a reason to restrict MaxDegreeOfParallelism, as asynchronous IO doesn't block the processor.

On the other hand, if DoBooster is trying to "improve" data loading from a database, it will increase delays as the concurrent connections will compete for the same network and disk bandwidth and even lock each other. The best way to improve database performance is to improve the query and indexing.

For example, a single query for N symbols will be faster than N individual queries for a single symbol at a time.

This query will scan the tick table just once. Dapper makes writing the query and the IN clause a lot easier than dealing with ADO.NET explicitly but doesn't change the query's behavior

var ticks=await con.QueryAsync<Tick>("select * from StockTicks where Symbol in (@symbols)", listOfSymbols);

Once the ticks are loaded, they can be grouped per symbol. In this case I use ToLookup which creates an ILookup<> class similar to a multi-valued dictionary:

var symbolTicks=ticks.ToLookup(tick=>tick.Symbol);
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236