3

In a .NET 6 project, I have to call a web API which is offset paginated (page/per page) and I would like to make the n calls parallel as far as possible.

This is the method which calls the API one time with the given page number:

private Task<ApiResponse> CallApiAsync(int page,
    CancellationToken cancellationToken = default)
{
    return GetFromJsonAsync<ApiResponse>($"...&page={page}", cancellationToken)
        .ConfigureAwait(false);
}

What I actually need is a forward only streamable iterator of all the API calls from page 1 to page n, so given this requirement, I thought IAsyncEnumerable was the right API to use so I could fire the API calls in parallel and access each API response as soon as one was ready, without needing all of them to be finished.

So I came up with the following code:

public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    int numProducts = GetNumberOfProducts(perPage);

    int numCalls = MathExtensions.CeilDiv(numProducts, perPage);

    var pages = Enumerable.Range(1, numCalls);

    Parallel.ForEach(pages, async page => {
        yield return await CallApiAsync(page, cancellationToken).ConfigureAwait(false);
    });

    yield break;
}

But I get the following error at yield: CS1621 - The yield statement cannot be used inside an anonymous method or lambda expression.
Is there a way to achieve the result I would like to get?
Feel free to ask questions if I wasn't clear enough!

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Oliver
  • 926
  • 2
  • 12
  • 31
  • 1
    This looks like a job for [`Parallel.ForEachAsync`](https://www.hanselman.com/blog/parallelforeachasync-in-net-6), not `Parallel.ForEach`. – Robert Harvey Aug 31 '22 at 16:35
  • Are you OK with yielding the responses in any order, or you want to yield them in the same order as the `pages`? – Theodor Zoulias Aug 31 '22 at 16:48
  • Ideally if the results of the taks were returned in order it would be better, but it would be okay in no particular order – Oliver Aug 31 '22 at 19:09

4 Answers4

5

The most readily available tool that you can use for this purpose is a TransformBlock<TInput,TOutput> from the TPL Dataflow library. This component is natively available in .NET Core and later, and it is essentially a processor/projector/transformer with two queues (input and output). You specify the processing function, then you configure the options according to your needs, then you feed it with data, and finally you retrieve the processed output:

public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    int numProducts = GetNumberOfProducts(perPage);
    int numCalls = MathExtensions.CeilDiv(numProducts, perPage);
    var pages = Enumerable.Range(1, numCalls);

    TransformBlock<int, ApiResponse> block = new(async page =>
    {
        return await CallApiAsync(page, cancellationToken);
    }, new ExecutionDataflowBlockOptions()
    {
        CancellationToken = cancellationToken,
        MaxDegreeOfParallelism = 10, // Configurable, the default is 1
        EnsureOrdered = true, // This is the default
    });

    // Feed the block with input data
    foreach (var page in pages) block.Post(page);
    block.Complete();

    // Emit the output data as they become available
    while (await block.OutputAvailableAsync())
        while (block.TryReceive(out var item))
            yield return item;

    // Propagate possible exception (including cancellation)
    await block.Completion;
}

This simple implementation initiates the TransformBlock when the resulting IAsyncEnumerable<ApiResponse> is enumerated, and it won't stop until either all the processing is completed, or the cancellationToken is canceled. The processing is not driven by the enumeration of the resulting sequence. It won't even stop if the client code simply abandons the enumeration, by breaking the await foreach loop. If you want to include this functionality (graceful termination), you will have to add a try-finally block, and an internal linked CancellationTokenSource as shown here. The yielding loop should be placed inside the try, and the cancellation of the linked CancellationTokenSource should be placed inside the finally.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
2

You can't return from Parallel.ForEach/Parallel.ForEachAsync to the "outside" of the enclosing function, you provide the function which will be executed on every iteration and that's it.

One approach is to just loop through the data using SemaphoreSlim to limit number of concurrently executing tasks.

async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    int numProducts = GetNumberOfProducts(perPage);
    int numCalls = MathExtensions.CeilDiv(numProducts, perPage);
    
    var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
    var degreeOfParallelism = 2;
    var semaphoreSlim = new SemaphoreSlim(degreeOfParallelism);

    var tasks = Enumerable.Range(1, numCalls)
        .Select(async page =>
        {
            var linkedToken = linkedTokenSource.Token;
            await semaphoreSlim.WaitAsync(linkedToken);
            linkedToken.ThrowIfCancellationRequested();
            try
            {
                return await CallApiAsync(page, linkedToken);
            }
            catch
            {
                linkedTokenSource.Cancel();
                throw;
            }
            finally
            {
                semaphoreSlim.Release();
            }
        })
        .ToList();
    
    foreach (var page in tasks)
    {
        yield return await page;
    }
}
Guru Stron
  • 102,774
  • 10
  • 95
  • 132
  • 1
    The problem with this approach is that in case a `CallApiAsync` call fails, the error will be propagated ASAP but the processing will not stop. This could result in unpleasant situations. For example the user gets an error message, clicks the "Retry" button, and a new set of async operations is launched while some of the previous operations are still running in the background unobserved (in a fire-and-forget fashion). In case the remote server has a max concurrency policy, this policy could be violated. – Theodor Zoulias Aug 31 '22 at 20:46
  • 1
    @TheodorZoulias yep, was already experimenting with that. Updated the code to address the issue. – Guru Stron Aug 31 '22 at 20:59
1

Parallel.ForEach is designed for in-memory CPU-bound operations, not for asynchronous IO.

It seems you can just use a simple for loop

public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    int numProducts = GetNumberOfProducts(perPage);

    int numCalls = MathExtensions.CeilDiv(numProducts, perPage);
    for (var page = 1; page <= numCalls; page++)
    {
        yield return await CallApiAsync(page, cancellationToken).ConfigureAwait(false);
    }
}

If you want to parallelize it (and I'm not sure that's actually a good idea with an API), you can perhaps use Tasks.

public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    int numProducts = GetNumberOfProducts(perPage);

    int numCalls = MathExtensions.CeilDiv(numProducts, perPage);
    
    var tasks = Enumerable.Range(1, numCalls)
        .Select(i => CallApiAsync(page, cancellationToken))
        .ToList();

    while(true)
    {
        var completed = await Task.WhenAny(tasks, cancellationToken).ConfigureAwait(false);
        tasks.Remove(completed);
        yield return completed.Result;
        if(tasks.Count == 0)
            break;
    }
}

This is a simplified version, and I may have left out some error handling, I'll leave you to improve that. You may want to use TransformBlock or similar in order to limit the concurrency, alternatively put a certain amount of Tasks into a list and wait for each one to finish on a one-out-one-in basis, using await Task.WhenAny.

Charlieface
  • 52,284
  • 6
  • 19
  • 43
1

If you want to spin tasks in parallel, calling await CallApiAsync(...) in a loop won't cut it. You have to use Task.Run(...). In it's simplest form:

var tasks = new List<Task>(pages.Length);
foreach (int page in pages)
{
    tasks.Add(
        Task.Run(() => CallApiAsync(page, cancellationToken).ConfigureAwait(false));
    );
}
await Task.WhenAll(tasks);

If you need to handle results per API call:

// within foreach loop:
Task.Run(async () => {
    var results = await CallApiAsync(page, cancellationToken).ConfigureAwait(false);
    // do something with `results`
});
Nenad
  • 24,809
  • 11
  • 75
  • 93
  • What is the point of wrapping async method in `Task.Run` here?? – Guru Stron Aug 31 '22 at 21:01
  • @GuruStron simply calling an `async` method does not mean it will be "executed in parallel". It means "it's scheduled to execute". Unless you wrap it in `Task.Run(...)`, then they are actually started immediately on separate thread. – Nenad Aug 31 '22 at 21:09
  • `Task.Run` just "schedules to execute" the provided function on the same thread pool where the task returned by the async method should be already scheduled on. The only difference `Task.Run` makes (at least I know/remember of) is in case if the `async` method is not an actually async or is a blocking for a long time before first `await` (i.e. written incorrectly/not fully correct). – Guru Stron Aug 31 '22 at 21:17
  • @GuruStron Did you actually test the part "...async method should be already scheduled on"? Because I did and in my case, tasks initiated with `Task.Run` do execute in parallel on the separate thread(s), while `async` method uses the same Thread, so they are only queued. Regarding - why `async` method, you are completely missing the point. How would you further process `results` variable? – Nenad Sep 01 '22 at 13:52
  • 1
    it depends on the method implementation (as I wrote in my previous comment). There is a possibility that [there is no thread](https://blog.stephencleary.com/2013/11/there-is-no-thread.html) at all. – Guru Stron Sep 01 '22 at 14:00
  • @GuruStron `foreach` with only `await` calls, no parallel execution. `foreach` with `Task.Run` - parallel execution. We are talking about API calls (so I/O). So what depends on "method implementation" here? We are talking about same API call in both cases (with or without `Task.Run`). – Nenad Sep 01 '22 at 14:49
  • foreach without `await` calls, and without `Task.Run` - parallel execution. [See it yourself](https://sharplab.io/#v2:EYLgxg9gTgpgtADwGwBYA0AXEUCuA7NAExAGoAfAAQCYBGAWACgKAGAAgpoDoAVAC1gCGhAJZ4A5gG5GLdlwAyogI5SmbDihUyOAVk1quAEWECxeCAGcMwsOZXSaATgAUAIkA8G4BB9lwEo7DRgBuAlCsGALmANbmrAC8rHgwAO6sCpYAPBRIAHxONMy+gcGs5slxAMoYEAAOiQIYYLycFcEYAHJJTgUMAGbQMAINrE6iGKxVJjCsoqwAong4ALYwUALAADYwnABKAuIwTsxoed7ejADejKxXoeFRnACChIROBosLAJ6dXQC+0g7sSE4AHVeDA8Pc1msnGFIuYuhxnCVODM1gIquYYIQALLCSHCDGQPCEOGaAAcANYrwWH06sSy7H+mU4BhgqM+2mY+QkQA=) – Guru Stron Sep 01 '22 at 18:41
  • Yes, in this trivial case it works. I've had issues with this in .NET Framework 4.8 Web project in IIS, with database calls. Maybe has to do with default TaskScheduler for that scenario. It's hard to make reproduction in fiddles, but I think that's more realistic scenario - web page calling multiple "backend" services. – Nenad Sep 01 '22 at 20:09
  • What database? Was it [Oracle by any chance](https://stackoverflow.com/a/29034291/2501279) ? =) – Guru Stron Sep 01 '22 at 20:15
  • SQL Server. For sure has nothing to do with the database, because with `Task.Run` it works as expected. It can be different TaskScheduler implementations in Console, WinForm and in ASP.NET. – Nenad Sep 01 '22 at 21:18
  • Would be interesting to see the full repro. As "for sure" - I would not be so sure - cause in case of linked question about Oracle it for sure had something to do with database (native client for it to be more precise), and it would work "as expected" if used with `Task.Run`. – Guru Stron Sep 01 '22 at 21:34