2

I'm fairly new to programming (< 3 years exp), so I don't have a great understanding of the subjects in this post. Please bear with me.

My team is developing an integration with a third party system, and one of the third party's endpoints lacks a meaningful way to get a list of entities matching a condition.

We have been fetching these entities by looping over the collection of requests, and adding the results of each awaited call to a list. This works just fine, but getting the entities takes a lot longer than getting entities from other endpoints that lets us get a list of entities by providing a list of ids.

.NET 6.0 introduced Parallel.ForEachAsync(), which lets us execute multiple awaitable tasks asynchronously in parallel.

For example:

public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(List<IRestRequest> requests) 
where TEntity : IEntity
{
    var entities = new ConcurrentBag<TEntity>();

    // Create a function that takes a RestRequest and returns the 
    // result of the request's execution, for each request
    var requestExecutionTasks = requests.Select(i => 
        new Func<Task<TEntity>>(() => GetAsync<TEntity>(i)));

    // Execute each of the functions asynchronously in parallel, 
    // and add the results to the aggregate as they come in
    await Parallel.ForEachAsync(requestExecutionTasks, new ParallelOptions
    {
        // This lets us limit the number of threads to use. -1 is unlimited
        MaxDegreeOfParallelism = -1 
    }, async (func, _) => entities.Add(await func()));

    return entities.ToList();
}

Using this code rather than the simple foreach-loop sped up the time it takes to get the ~30 entities on my test instance, by 91% on average. That's awesome. However, we are worried about the rate limiting that is likely to occur when we use it on a client's system with possibly thousands of entities. We have a system in place that detects the "you are rate limited"-message from their API, and cues the requests for a second or so before trying again, but this is not as much a good solution as it is a safety measure.

If we where just looping over the requests, we could have throttled the calls by doing something like await Task.Delay(minimumDelay) in each iteration of the loop. Correct me if I'm wrong, but from what I understand this wouldn't actually work when executing the requests in parallel foreach, as it would make all requests wait the same amount of time before the execution. Is there a way to make each individual request wait a certain amount of time before execution, only if we are close to being rate limited? If at all possible, I would like to do this without limiting the number of threads to use.


Edit

I wanted to let this question sit a little so more people could answer. Since no new answers or comments have been added, I'm marking the one answer I got as correct. That being said, the answer suggests a different approach than using Parallel.ForEachAsync.

If I understand the current answer correctly, the answer to my original question of whether or not it's possible to throttle Parallel.ForEachAsync, would be: "no, it's not".

Hyperwave
  • 123
  • 7
  • Would it be possible for you to introduce this delay within the call to GetAsync i.e. have some parameter you pass in to indicate that there should be a delay ? How do you know you need to rate limit btw ? – auburg Dec 09 '21 at 14:53
  • 2
    You can find a `RateLimiter` class in [this](https://stackoverflow.com/questions/65825673/partition-how-to-add-a-wait-after-every-partition "Partition: How to add a wait after every partition") question. Rate limiting means that you want to impose a limit to the number of operations that can be *started* during any time span. The duration of each operation is not taken into account. Instead if you want to impose a limit to the number of operations that are concurrently in flight, use the `MaxDegreeOfParallelism` option. – Theodor Zoulias Dec 09 '21 at 15:15
  • @auburg Yes, that would be possible, but I would much prefer to configure the delay(s) when creating the tasks to execute. I know that the third party's API has rate limited our calls before. I don't remember which it is, but their limit is either 60 or 100 calls per second. – Hyperwave Dec 09 '21 at 15:21
  • as @TheodorZoulias implies, Parallel.ForEachAsync is a bit too basic for this. If you add a delay, that does not stop foreach adding tasks to the threadpool, they will just all have the delay in them (yuk). You need a "real" rate limiter that stops adding new tasks when the limit is hit – AndyPook Dec 09 '21 at 15:22
  • As a side note, using the `Parallel.ForEachAsync` API with unbounded parallelism is not much different than starting all the tasks at once and awaiting them with `await Task.WhenAll(tasks)`. Arguably it might be even better to use this simpler approach, because the `Task.WhenAll` API returns naturally the results in their original order, and it doesn't require messing with concurrent collections like the `ConcurrentQueue` or the `ConcurrentBag`. – Theodor Zoulias Dec 09 '21 at 15:40
  • @TheodorZoulias If I where to use MaxDegreeOfParallelism throttle the calls, wouldn't that mean that I have to calculate the throughput of the system or average execution time or something like that, in order to optimize the rate whilst remaining below the limit? – Hyperwave Dec 09 '21 at 15:41

1 Answers1

1

My suggestion is to ditch the Parallel.ForEachAsync approach, and use instead the new Chunk LINQ operator in combination with the Task.WhenAll method. You can launch 100 asynchronous operations every second like this:

public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(
    List<IRestRequest> requests) where TEntity : IEntity
{
    var tasks = new List<Task<TEntity>>();
    foreach (var chunk in requests.Chunk(100))
    {
        tasks.AddRange(chunk.Select(request => GetAsync<TEntity>(request)));
        await Task.Delay(TimeSpan.FromSeconds(1.0));
    }
    return (await Task.WhenAll(tasks)).ToList();
}

It is assumed that the time required to launch an asynchronous operation (to invoke the GetAsync method) is negligible.

This approach has the inherent disadvantage that in case of an exception, the failure will not be propagated before all operations are completed. For comparison the Parallel.ForEachAsync method stops invoking the async delegate and completes ASAP, after the first failure is detected.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • @Hyperwave the `Parallel.ForEachAsync` allows to limit the degree of parallelism (`MaxDegreeOfParallelism`), but it has no built-in capabilities for limiting the rate. In the next .NET version (.NET 7) there might be new APIs offering this functionality ([Rate limit APIs](https://github.com/dotnet/runtime/issues/52079)). For the time being only custom/third-party solutions exist, like [this one](https://stackoverflow.com/questions/65825673/partition-how-to-add-a-wait-after-every-partition/65829971#65829971). – Theodor Zoulias Mar 28 '22 at 14:04