4

I was wondering if my approach is good to query a REST-API in parallel because there is a limit on how many results can be obtained with one request (1000). To speed up things I want to do this in parallel.

The idea is to use a partitioner to create a set of ranges (10 ranges in my case). Every range is executed in parallel to query an API-endpoint.

The result is array of Tasks. With Task.WhenAll(tasks) I wait until all tasks have been finished and then I have to flatten down the string[][]-array to get a one dimensional array.

Any ideas or a better solution?

public async Task<string[]> QueryApiAsParallel() {
    int maximum = 10000; // I don't want to query more than 10000 results,
                         // even I know that are a lot more results
    int rangeSize = 1000; // maximum number that can be received via API

    Task<string[]>[] tasks = Partitioner.Create(0, maximum, rangeSize).AsParallel()
        .Select(async (range, index) => {
        int skip = range.Item1;
        int first = range.Item2 - range.Item1;

        string[] names = await apiClient.GetNames(skip, first);

        return names;
    }).ToArray();

    string[][] tasksCompleted = await Task.WhenAll(tasks);

    string[] flattened = tasksCompleted.SelectMany(x => x).ToArray();

    return flattened;
}

The implementation is maybe a bit inefficient.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104

3 Answers3

3

Instead of creating the tasks with PLINQ and awaiting them with Task.WhenAll, you could use the .NET 6 Parallel.ForEachAsync API. This method doesn't have an overload that accepts a Partitioner<T> nor it has an overload that returns the results of the parallel operation. To work around these limitations, in the example below I am using the GetOrderableDynamicPartitions method to get the partitions, and a ConcurrentDictionary<long, string[]> to store the results:

public async Task<string[]> QueryApiParallelAsync()
{
    int maximum = 10000;
    int rangeSize = 1000;

    IEnumerable<KeyValuePair<long, Tuple<int, int>>> partitions = Partitioner
        .Create(0, maximum, rangeSize)
        .GetOrderableDynamicPartitions();

    ParallelOptions options = new() { MaxDegreeOfParallelism = 2 };

    ConcurrentDictionary<long, string[]> results = new();

    await Parallel.ForEachAsync(partitions, options, async (entry, ct) =>
    {
        long index = entry.Key;
        int start = entry.Value.Item1;
        int count = entry.Value.Item2 - entry.Value.Item1;

        string[] names = await apiClient.GetNames(start, count);

        results.TryAdd(index, names);
    }).ConfigureAwait(false);

    return results.OrderBy(e => e.Key).SelectMany(e => e.Value).ToArray();
}

This is not the only way to collect the results of a Parallel.ForEachAsync loop in the original order. You could look at this answer for more ideas.

Regarding the MaxDegreeOfParallelism, you could experiment with various values until you hit the sweet spot that yields the best performance. You could also take a look at this question: Factors for determining the degree of parallelism for the ForEachAsync.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
1

I think you could simplify things if you start out by thinking about your actions as requesting individual "pages" rather than partitioned ranges.

I suspect you're probably not getting much value from parallelism here: most of your time is probably spent waiting for I/O to complete. Although it's easy enough to add a call to .AsParallel().WithDegreeOfParallelism(...) to test that assumption.

int maxPages = (int)Math.Ceiling(maximum/(double)rangeSize);

Task<string[]>[] tasks = Enumerable.Range(0, maxPages)
    .Select(async pageNumber => await apiClient.GetNames(pageNumber * rangeSize, rangeSize))
    .ToArray();

Note that this could grab more than your desired maximum if the max isn't a multiple of your page size. If that's an issue, it's not too hard to fix with a little more math. But I suspect, if you think about your domain, that you're probably thinking more in terms of how many requests you want to make in the first place, and reverse-engineering a maximum from that--a step you could avoid if you establish maxPages and rangeSize as your starting inputs in the first place.

You didn't mention whether the API you're accessing has a limit on the number of concurrent requests allowed. Since this approach is going to send all your requests as soon as it hits the first call to .ToArray(), you'll want to make sure the API won't start rejecting or disproportionately throttling your requests. If you need to throttle your own requests, it's probably simplest to use ForEachAsync as described in Theodor Zoulias's answer.

StriplingWarrior
  • 151,543
  • 27
  • 246
  • 315
  • BTW, OP doesn't mention any limit on number of concurrent requests, only "how many results can be obtained with one request". – StriplingWarrior Mar 23 '23 at 19:53
  • True. In which case their current code is fine. But in practice limiting the concurrency is beneficial more often than not. We will hopefully learn if that's the case, when the OP tries the various approaches and give us some feedback. :-) – Theodor Zoulias Mar 23 '23 at 19:58
  • 1
    +1 for the suggestion to switch from partitioning to paging. The `Partitioner` is one of my least favorite classes in the whole .NET runtime. Each time I have to use it, my brain hurts. – Theodor Zoulias Mar 24 '23 at 05:00
0

If you are using .NET 6 and upper you can try to use Parallel.ForEachAsync(). see documentation

I tried to change your code a bit to fit into an example implementation and to be able to compare the queries.

Here is a small dotnet fiddle

You can then use it according to this example

public static async Task<string[]> QueryApiAsParallelOptimized() 
{
    int maximum = 10000; 
    int rangeSize = 1000;

    // Concurrent to be thread safe
    var results = new ConcurrentBag<string[]>();

    // Simple parititioning by `.Range` and `.Chunk` methods
    // Eventually you can also use the Partitioner here?
    var partitions = Enumerable.Range(0, maximum);
    var chunks = partitions.Chunk(rangeSize);
        
    var parallelOptions = new ParallelOptions()
    {
        // Set this to a value fitting best to your environment
        // https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.paralleloptions.maxdegreeofparallelism?view=net-7.0
        MaxDegreeOfParallelism = 1000
    };
        
    await Parallel.ForEachAsync(chunks, parallelOptions, async (range, token) =>
    {
        // Implement you logic here. This is just the example implementation
        int skip = range.First();
        string[] names = await CallApiAsync(skip, rangeSize);
        results.Add(names);
    });
    
    // Flatten as you had it before.
    string[] flattened = results.SelectMany(x => x).ToArray();
    return flattened;
}

From the dotnet fiddle it is around ~20 % faster.

enter image description here

Martin
  • 3,096
  • 1
  • 26
  • 46
  • 2
    _"// Set this to a high value for max parallelism MaxDegreeOfParallelism = 1000"_ - quite doubtful statement, especially if it is run in ASP.NET Core environment. Unless of course you have a machine with 500-1000 cores. – Guru Stron Mar 23 '23 at 22:53