0

I have the following architecture:

Web Site -calls-> Proxy Api -calls-> External Api

From the website, I have to make <= 25 calls from the web site to the proxy api. My goal is to call all apis and either:

  1. Get all results (currently using Task.WhenAll), or
  2. As soon as any of them fail, catch that exception and cancel all other tasks.

Website Code

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    var apiTasks = apiInfos
        .Select( apiInfo => {
            // Simplified code...but return a call to Task<T> function *without* await
            return GetWebApiResultAsync( info, token );
        } );

    var xmlResults = await Task.WhenAll( apiTasks );

    foreach( var result in xmlResults )
    {
        // Process results
    }
}

async Task<XElement> GetWebApiResultAsync( ApiInfo info, CancellationToken ct = default )
{
    // simplified but essentially call SendAsync with some processing of info before and results after to return XElement.
    using var response = await httpClient.SendAsync( request, cancellationToken: cancellationToken );

    return new XElement( "ResultsGeneratedFromRESTCall" );

}

Proxy Api

app.MapGet("/call-external", (CancellationToken token) =>
{
    // Some code to prep to call external api, but looks the same as call above
    // but could throw exception
    using var externalResponse = await httpClient.SendAsync( httpRequest, token );

    externalResponse.EnsureSuccessStatusCode();

    var externalResult = ( await coverageResponse.Content.ReadFromJsonAsync<JsonObject>() )!;

    if ( externalResult[ "CustomCheck" ] != null )
    {
        throw new ApplicationException( "Custom failure from external source" );
    }

    // Otherwise generate results (could require making additional calls to external api)
});

I was going to change my pattern to this answer that implemented a custom ForEachAsync method. Would simply changing my function to look like this accomplish my two goals above (specifically throwing first exception it receives and cancelling all other requests)?

New WebSite Code with ForEachAsync

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    var xmlResults =
        await apiInfos.ForEachAsync( 
            new ParallelOptions { 
                MaxDegreeOfParallelism = apiInfos.Length, 
                CancellationToken = token
            },
            async ( apiInfo, ct ) => {
                // Now returning via await!
                return await GetWebApiResultAsync( info, ct );
            } );

    // remaining code unchanged
}

Update: Using CancellationTokenSource to Cancel Other Tasks

I updated my call to the following, but now the only exception I get 'at the end' is System.Threading.Tasks.TaskCanceledException instead of the actual Exception that was originally thrown. Is that expected? Is there a way to find the original exception?

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    using var cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource( token );

    var xmlResults =
        await apiInfos.ForEachAsync( 
            new ParallelOptions { 
                MaxDegreeOfParallelism = apiInfos.Length, 
                CancellationToken = cancelTokenSource.Token
            },
            async ( apiInfo, ct ) => {
                // Now returning via await!
                try
                {
                    if ( simulatedException )
                    {
                        await Task.Delay( 1500 );
                        throw new ApplicationException( "Emails endpoint is not supported" );
                    }

                    return await GetWebApiResultAsync( info, ct );
                }
                catch
                {
                    cancelTokenSource.Cancel();
                    throw;
                }
            } );

    // remaining code unchanged
}
Terry
  • 2,148
  • 2
  • 32
  • 53
  • Related: [Stop Parallel.ForEachAsync](https://stackoverflow.com/questions/70827715/stop-parallel-foreachasync), and also [The need for two cancellation tokens in .NET 6 Parallel.ForEachAsync?](https://stackoverflow.com/questions/70191295/the-need-for-two-cancellation-tokens-in-net-6-parallel-foreachasync) – Theodor Zoulias Jun 18 '23 at 18:59
  • @TheodorZoulias I was going down the path and experimenting with same thing you mentioned in Stop Parallel.ForEachAsync. I've updated my question to show unexpected behavior and/or my incorrect implementation :) – Terry Jun 18 '23 at 19:37
  • When you call the `ProcessApiCallsAsync`, are you passing a `CancellationToken` that is canceled with a timer or something? This would explain the behavior. Non-cancellation exceptions are emerged with priority, so if you have a real exception you shouldn't see cancellation exceptions. – Theodor Zoulias Jun 18 '23 at 19:43
  • 1
    Updated my last code sample. I was currently just using a simulated exception 'at the highest level' to get my head around this pattern. Then I was gradually going to move this simulated exception in my Proxy Api to confirm things still behaved, then remove it all together and only catch 'real' exceptions. – Terry Jun 18 '23 at 19:46
  • Oops, and to answer question about ProcessApiCallsAsync, that is just the token passed to my sites endpoint (so same as `context.RequestAborted` I believe). – Terry Jun 18 '23 at 19:47
  • `await Task.Delay( 1500, ct );` and don't bother with the linked token, just let the `ForEachAsync` cancel everything. – Charlieface Jun 18 '23 at 21:31

2 Answers2

1

Yes, what you are doing with the ForEachAsync of the other answer is correct, with the exception of the configuration MaxDegreeOfParallelism = apiInfos.Length, which is questionable. You generally do want to impose a limit to the concurrency.

If you wanted to do the same with the simpler Task.WhenAll, you would do something like this:

async Task ProcessApiCallsAsync(ApiInfo[] apiInfos, CancellationToken cancellationToken)
{
    using CancellationTokenSource linkedCts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);

    Task<XElement>[] apiTasks = apiInfos.Select(apiInfo =>
    {
        try { return GetWebApiResultAsync(info, linkedCts.Token); }
        catch { cts.Cancel(); throw; }
    }).ToArray();

    XElement[] xmlResults = await Task.WhenAll(apiTasks);

    //...
}

But using the Parallel.ForEachAsync is better because it allows to limit the concurrency, so that you don't bombard the remoter server with dozens of requests simultaneously.

Btw in your case there is a simpler way to collect the results of the Parallel.ForEachAsync, because the source sequence is an array with known size:

async Task ProcessApiCallsAsync(ApiInfo[] apiInfos, CancellationToken cancellationToken)
{
    ParallelOptions options = new()
    { 
        MaxDegreeOfParallelism = 5, // A reasonable number
        CancellationToken = cancellationToken
    };

    XElement[] xmlResults = new XElement[apiInfos.Length];

    await Parallel.ForEachAsync(Enumerable.Range(0, apiInfos.Length), options, async (i, ct) =>
    {
        xmlResults[i] = await GetWebApiResultAsync(apiInfos[i], ct);
    });

    // Here all the slots of the xmlResults array are filled with data.
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Interesting. I have almost exact pattern as your first example implemented and I only receive TaskCancelledExcpetion instead of the real exception. I'll try your suggestion with simply using TaskWhenAll and see if the result is different. Additionally, the apiInfos.Length limitation I used was based on one of your previous comments (that I thought I understood) and I was basically just trying to say 'I know I don't have too many tasks here...just send them all at once'. – Terry Jun 18 '23 at 19:43
  • @Terry in that case I would prefer `MaxDegreeOfParallelism = Int32.MaxValue`, because it communicates better the intention to impose no concurrency limit whatsoever. – Theodor Zoulias Jun 18 '23 at 19:47
  • Can the property assignment just be omitted with same result? And I simply assign only the `CancellationToken`? – Terry Jun 18 '23 at 19:48
  • 1
    @Terry no. The default [`MaxDegreeOfParallelism`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.paralleloptions.maxdegreeofparallelism#remarks) is `-1`, which means "unlimited" in general, but specifically for the `Parallel.ForEachAsync` it means `Environment.ProcessorCount`. So in this case if you want to say "unlimited", `Int32.MaxValue` is the way. – Theodor Zoulias Jun 18 '23 at 19:52
  • Last question about ParallelOptions.CancellationToken - I should always set this to the CancellationToken passed to my method (from the Request.Aborted CancellationToken) right? – Terry Jun 20 '23 at 18:38
  • @Terry in general yes. In some advanced scenarios you may want to create a linked token source (as shown in this answer), and set the `ParallelOptions.CancellationToken` to `linkedCts.Token`. – Theodor Zoulias Jun 20 '23 at 19:06
1

You don't need a linked token, Parallel.ForEachAsync will automatically cancel any tasks on the first exception, and will also cancel them if the token is canceled as well.

You need to make sure to pass the token in to any async functions you are calling within the ForEachAsync lambda, such as Task.Delay.

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    var xmlResults = new ConcurrentBag<ApiInfo>();
    await Parallel.ForEachAsync(apiInfos, cancelToken,
        async (apiInfo, ct) => {
            if ( simulatedException )
            {
                await Task.Delay(1500, ct);  // pass token in
                throw new ApplicationException("Emails endpoint is not supported");
            }

            var result = await GetWebApiResultAsync(info, ct);  // pass token in
            xmlResults.Add(result);
        });
    // remaining code unchanged
}

You can see this in action in this fiddle where the remaining task gets canceled immediately upon an exception.

Charlieface
  • 52,284
  • 6
  • 19
  • 43
  • Not thrilled with the `ConcurrentBag`, for [a few reasons](https://stackoverflow.com/a/64823123/11178549). – Theodor Zoulias Jun 18 '23 at 22:19
  • True, that wasn't the point of the answer though. I guess depending on use case a `lock` with a standard `List` could work also. Kinda weird that `Parallel.ForEachAsync` doesn't support a returning `ValueTask` – Charlieface Jun 18 '23 at 22:21
  • The `Parallel.ForEach` doesn't have an overload that returns a `TResult[]` either. This functionality is covered by the PLINQ. There is no asynchronous version of the PLINQ though, and it's unlikely that there will be one any time soon. – Theodor Zoulias Jun 18 '23 at 22:28
  • @TheodorZoulias and Charlie - I want to post a revised version of my code and some questions/issues based on comments from both of you. Should I edit (re-write) my question, add an 'update' at the bottom (feel like it has changed too much) or just make a new question? – Terry Jun 19 '23 at 00:57
  • 2
    Probably best to make a new question at this point – Charlieface Jun 19 '23 at 01:16
  • @Charlieface Looking at your 'rep', probably don't matter, I wish I could just mark both these as answers. In either case, this was similar enough to Theodor's 'simpler way' example that I was able to come up with something clean. Will post a question for one issue that I think I understand but want to confirm. – Terry Jun 19 '23 at 04:44
  • I've made a new question here: https://stackoverflow.com/q/76508795/166231 - warning, it's beefy. Sorry for delay, but while trying to get logging to generate something I hope is intelligible, I was also learning and answering some of my own questions. As the TLDR says, I think it is all 'asynchronous' timing inexperience that is leading to my confusion/final questions. @TheodorZoulias you might want to peek at it to, but np if don't get to it. – Terry Jun 19 '23 at 17:19