1

Update: 're-submitted' with hopefully easier code for responders to examine and run locally to reproduce the problem.

TLDR; I think my questions below stem from a lack of experience with multi-threaded async/await programming. I think the confusion all comes from the fact that there are multiple await statements throughout my code/workflow that take an indeterminant amount of time, leading to different timing/logging results/behaviors.

I have the following architecture.

Web Site Api Endpoints -calls-> Proxy Api -calls-> External Api

This question is a follow up to my question: Best way to handle Async/Await and cancellation to cancel all running tasks if any fail which I received great advice on a helper ForEachAsync helper method.

My outstanding issue is trying to understand CancellationToken and why I'm not receiving consistent OperationCanceledException exceptions thrown/logged where I thought they might otherwise have been.

I've written a .NET 7 console application that, to the best of my ability, simulates my architecture and the code can be found below.

My Questions

  1. If you uncomment throw new ApplicationException($"Site - Fake Exception for {item}"); and run the application over and over, there are a few notable issues.
    1. I never receive logs from Proxy Api, as if the request never got there before httpClient.SendAsync calls were cancelled. Why, does it take 'that long' to get a http request ready and the CancellationToken will already have been set to cancelled?
    2. I don't always receive 10 logs for 'Site -> Api Proxy' (the logs.Enqueue($"Site: Calling ProxyApiAsync for {item}"); line within the async delegate. Why?
  2. If you comment out throw new ApplicationException($"Site - Fake Exception for {item}"); and run the application over and over, there are a few notable issues.
    1. I successfully get 10 logs for both 'Site -> Api Proxy' and 'Api Proxy -> External', but I don't always get 9 'OperationCanceledException caught' logs in Site and Api Proxy. If I submitted 10 requests to the Api Proxy and 1 fails, shouldn't I always get 9 cancels? (writing this, I think I just discovered why, whichever 'missing cancels' exist represent Proxy->External that successfully finished before task 3 threw an exception?)
using System;
using System.Linq;
using System.Threading;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Concurrent;
                    
public static class Program
{
    // Use this instead of Console.WriteLine() to enable some 'summary' queries after processing
    static ConcurrentQueue<string> logs = new ConcurrentQueue<string>();
    
    public static async Task Main()
    {
        logs = new ConcurrentQueue<string>();
        
        var builder = WebApplication.CreateBuilder();
        builder
            .Logging.SetMinimumLevel(LogLevel.None)
            .Services.AddHttpClient();
        var app = builder.Build();

        app.MapGet("/proxyapi/{taskId}", ProxyApiAsync); // simulated Proxy Api web api
        
        var minApiTask = app.RunAsync();
        await Task.Delay(100); // Wait for the API to start
        
        var httpClientFactory = app.Services.GetRequiredService<IHttpClientFactory>(); // simulated support for DI
        var apiInfosToRun = Enumerable.Range(0, 10).ToArray(); // simulated list of APIs to call

        try
        {
            await WebSiteEndpointAsync( apiInfosToRun, httpClientFactory );
        }
        catch (Exception ex)
        {
            logs.Enqueue($"Site: {ex.GetType().Name}: {ex.Message}");
        }

        await app.StopAsync();
        await minApiTask;

        Console.WriteLine($"{logs.Count(l => l.StartsWith("Site: Calling Proxy"))} calls from Site -> Api Proxy");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("ProxyApiAsync: Calling external"))} calls from Api Proxy -> External");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("OperationCanceledException: ProxyApiAsync"))} OperationCanceledException caught in Proxy Api");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("OperationCanceledException: Site Delegate"))} OperationCanceledException caught in Site Delegate");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("OperationCanceledException: Site ForEachAsync"))} OperationCanceledException caught in Site ForEachAsync Extension");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("ProxyApiAsync: ApplicationException"))} ApplicationException caught in Proxy Api");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("Site: ApplicationException"))} ApplicationException caught in Site");
        Console.WriteLine("");

        foreach (var log in logs)
        {
            Console.WriteLine(log);
        }
    }

    static async Task WebSiteEndpointAsync( int[] apiInfosToRun, IHttpClientFactory httpClientFactory )
    {
        logs.Enqueue($"Site: {apiInfosToRun.Length} APIs to run");

        var apiResponses = await apiInfosToRun.ForEachAsync(
            new ParallelOptions { MaxDegreeOfParallelism = Int32.MaxValue },
            async (item, ct) =>
            {
                try
                {
                    logs.Enqueue($"Site: Calling ProxyApiAsync for {item}");

                    if (item == 3)
                    {
                        // throw new ApplicationException($"Site - Fake Exception for {item}");
                    }

                    using var client = httpClientFactory.CreateClient();
                    using var request = new HttpRequestMessage
                    {
                        Method = HttpMethod.Get,
                        RequestUri = new Uri($"http://localhost:5000/proxyapi/{item}"),
                    };
                    var apiResponse = await client.SendAsync(request, ct);

                    apiResponse.EnsureSuccessStatusCode();

                    var apiResult = await apiResponse.Content.ReadAsStringAsync(ct);

                    if (apiResult.StartsWith("FAILED"))
                    {
                        throw new ApplicationException(apiResult);
                    }

                    return apiResponse;
                }
                catch (OperationCanceledException)
                {
                    logs.Enqueue($"OperationCanceledException: Site Delegate for {item}");
                    throw;
                }
            }
        );

        logs.Enqueue("Site: Finished all APIs");

    }
    
    static async Task<string> ProxyApiAsync(int taskId, IHttpClientFactory httpClientFactory, CancellationToken cancellationToken )
    {
        try
        {
            logs.Enqueue($"ProxyApiAsync: Calling external API for {taskId}");
            
            var httpClient = httpClientFactory.CreateClient();
            var response = await httpClient.GetAsync( "https://www.msn.com/", cancellationToken ); // Simulated call to external api

            if (taskId == 3)
            {
                throw new ApplicationException($"Proxy Api - Fake Exception for {taskId}");
            }

            response.EnsureSuccessStatusCode();
            
            var content = await response.Content.ReadAsStringAsync( cancellationToken );
            
            return $"SUCCESS: Length={content.Length}";
        }
        catch (OperationCanceledException)
        {
            logs.Enqueue($"OperationCanceledException: ProxyApiAsync for {taskId}");
            return "CANCELLED";
        }
        catch ( Exception ex )
        {
            logs.Enqueue($"ProxyApiAsync: {ex.GetType().Name}: {ex.Message}");
            return $"FAILED: {ex.Message}";
        }
    }

    static async Task<TResult[]> ForEachAsync<TSource, TResult>(
        this TSource[] source,
        ParallelOptions parallelOptions,
         Func<TSource, CancellationToken, ValueTask<TResult>> body
    )
    {
        TResult[] results = new TResult[source.Length];

        await Parallel.ForEachAsync(
            Enumerable.Range(0, source.Length),
            parallelOptions,
            async (i, ct) =>
            {
                try
                {
                    results[i] = await body(source[i], ct); // .ConfigureAwait( false );
                }
                catch (OperationCanceledException)
                {
                    logs.Enqueue($"OperationCanceledException: Site ForEachAsync Extension for {source[i]}");
                    throw;
                }
            });

        return results;
    }
}
Terry
  • 2,148
  • 2
  • 32
  • 53
  • You know, your fancy `ForEachAsync` function is nothing but a `Task.WhenAll`, but worse. Why don't you just use the standard tools already written and tested for you? – Blindy Jun 19 '23 at 17:24
  • @Blindy would you like to explain why the `Task.WhenAll` is better than the OP's `ForEachAsync`? – Theodor Zoulias Jun 19 '23 at 17:28
  • @Blindy From Theodor, see this answer explaining WhenAll (https://stackoverflow.com/questions/60659896/whenany-vs-whenall-vs-waitall-vs-none-given-that-results-are-being-used-immedia/60664631#60664631) and this answer showing original implementation (https://stackoverflow.com/a/71129678/166231). – Terry Jun 19 '23 at 17:31
  • @TheodorZoulias Suggestions? :) I guess I just wanted the 5 locations in code that generated logs, so it would be obvious where they came from in my trace scenarios. Is it the 'code', 'questions' or 'trace logs' that you find to be the noisiest? – Terry Jun 19 '23 at 17:34
  • @Terry, your point is that `Task.WhenAll` is bad because it "bombards" the framework with a lot of operations, so your solution is to create an unbound number of threads instead using an uncapped `Parallel.ForEach`? Also that nonsense about "early cancellation", which you can clearly see it doesn't work as you expect, can easily be moved to the callee side if properly written -- every major `*Async` api provides support for cancellation tokens. – Blindy Jun 19 '23 at 17:35
  • @Blindy Not sure where the 'nonsense' about early cancellation is. TBH, Theodor can explain much better than me, but with `WhenAll`, I'd have to use a `CancellationTokenSource` to generate a token and within each task, call `.Cancel` on that upon exception, otherwise, one task might throw exception and all others will simply run to completion before `WhenAll` raises the exception. With `Parallel.ForEach` it automatically triggers cancellation upon first exception. Additional, with `Parallel.ForEach`, it is not an unbound number of threads. It is bound via the `ParallelOptions` parameter. – Terry Jun 19 '23 at 17:44
  • @TheodorZoulias So just create a Console app where my different parts of the architecture are just different classes and/or methods? Assuming that is right, I'll get on it. – Terry Jun 19 '23 at 17:47
  • @Blindy the early cancellation is not nonsense. It is a helpful mechanism that increases the responsiveness of the parallel operation in case of an error. If you insist that it is nonsense, then you have to explain why the modern .NET 6 `Parallel.ForEachAsync` API includes this mechanism, and also why the older `Parallel.ForEach` API includes a similar but less powerful mechanism (`ParallelLoopState.ShouldExitCurrentIteration`). – Theodor Zoulias Jun 19 '23 at 17:48
  • 1
    You need to pass the token in te proxy handler `var externalResult = ( await coverageResponse.Content.ReadFromJsonAsync(token) )!;` I agree there is far too much noise here to be able to see what is going on. *"Is it surprising that I received OperationCanceled exceptions in both the ForEachAsync extension method and inside the delegate?"* No they are thrown in both places. The `ForEachAsync` cancels all the other tasks using the token, swallows the exception and throws the original exception from the task that excepted. – Charlieface Jun 19 '23 at 18:43
  • @Charlieface is it 'incorrect' to not pass token to every async method or could I decide which I care to interrupt? IE if I thought one or more *Async methods wouldn't take long I could just let them run until the next 'long running' Async method. Not saying I'll do this, but just curious. OK, I'm now going to create that smaller program to replicate my issues/questions. – Terry Jun 20 '23 at 11:47
  • Do whatever you want, just be aware that cancellation is coopeartive, you cannot cancel a task that is not using the token correctly. Either way, do you still have a question? `ForEachAsync` seems to be working as designed. – Charlieface Jun 20 '23 at 11:49
  • @Charlieface I've updated/re-written my question and *think* I understand everything finally. You want to submit an answer/confirmations to my questions above and I'll mark it complete? – Terry Jun 20 '23 at 14:17
  • 1
    @TheodorZoulias Not sure 'how much cleaner' it is, but I've created a console app that simulates architecture and reproduces. First, I created a .net fiddle and just using 'method calls', I didn't get same experience I assume due to the delays causes by using HttpClient and calling items. So I had to make a console app that made a minimal api endpoint (to represent Api Proxy) . I couldn't do this in fiddle unfortunately because localhost:5000 wouldn't work, thus console app. But at least it is easily dropped into IDE and runnable. Hope that is sufficient. – Terry Jun 20 '23 at 14:19

1 Answers1

1

Parallel.ForEachAsync will cancel all remaining tasks in the event of an exception.

  1. The throw is happening before the first await, which probably means that none of the other tasks even started running.
    1. It doesn't take that long, they just never even began running because the first await had not been hit on the other tasks.
    2. Same reason.
  2. Yes correct, the task that is excepting cancels the whole operation before some of them even began, or they have already completed. In this version, the exception happens after the first await.
Charlieface
  • 52,284
  • 6
  • 19
  • 43
  • Regarding #2 - so you are saying that the 'deficit' in (my) expected cancel logs is due to 'not starting' versus 'tasks completing before task 3 throws'? – Terry Jun 20 '23 at 15:33