1

I wrote a method to process concurrently several tenth of API calls (see code below).

Thing is the API's server bears up to about 20 simultaneous calls, but above that, it starts to return 500 errors for some calls.
And when this kind of error is raised, all the tasks are somehow "canceled" or "forgot", not only the calls that returned 500s, because the resulting metadatas collection is empty.
I tried to surround Task.WaitAll with a try/catch, but there is no exception throwed here. No surprise, because TryGetMetadataAsync is all included in a try/catch.

I just want to keep results from successful calls and forget errored calls.
Any idea on what I am missing? Maybe there is an issue with cancellation tokens?

FWIW: api.GetMetadataWithHttpInfoAsync is an Open API Generator client (generated from a Swagger file).

public async Task<IEnumerable<Metadata>> GetMetadataAsync(IEnumerable<string> dataRefs,
    string? userJwt, CancellationToken cancellationToken)
{
    var userId = GetUserIdFromJwt(userJwt);

    var tasks = dataRefs
        .Where(dataRef => !string.IsNullOrEmpty(dataRef))
        .Select(dataRef => TryGetMetadataAsync(dataRef, userId, cancellationToken))
        .ToArray();

    // Load data concurrently and wait for all results:
    Task.WaitAll(tasks, cancellationToken);

    var metadatas = new List<Metadata>();
    foreach (var task in tasks)
    {
        var metadata = await task;
        if (metadata != null) { metadatas.Add(metadata); }
    }

    return metadatas;
}

private async Task<Metadata?> TryGetMetadataAsync(string dataRef, string userId,
    CancellationToken cancellationToken)
{
    try
    {
        if (cancellationToken.IsCancellationRequested) { return null; }

        // Add a custom timeout to 30 seconds
        // (don't wait for potential TaskCanceledException that may be due
        // to 500 errors):
        using var timeoutCancellationTokenSource = new CancellationTokenSource(
            TimeSpan.FromSeconds(30));

        var id = GetIdFromDataRef(dataRef);
        if (id == null) { return null; }

        var metadataResponse = await api.GetMetadataWithHttpInfoAsync(id, userId,
            timeoutCancellationTokenSource.Token);

        if (metadataResponse.StatusCode == HttpStatusCode.OK
            && metadataResponse.Data != null)
        {
            // Just a simple object conversion here...
            return await ConvertToMetadataAsync(dataRef, metadataResponse.Data);
        }
        else
        {
            // Error management removed for brevity...
        }
    }
    catch (ApiException apiEx)
    {
        // Exception specific management removed for brevity...
    }
    catch (Exception ex)
    {
        // Exception management removed for brevity...
    }

    return null;
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Dude Pascalou
  • 2,989
  • 4
  • 29
  • 34
  • Use a semaphore (perhaps `SemaphoreSlim`, that plays well with async/await) to limit the number of simultaneous connections. – Ben Voigt Aug 18 '22 at 20:30
  • Is there a chance that `GetIdFromDataRef` returns null or `ConvertToMetadataAsync` returns null? You either need to step debug or add some logging to figure out what is happening to these tasks line by line. – John Wu Aug 18 '22 at 20:43
  • @BenVoigt: Yes, that would be a workaround: I did some tests with chunks of 20 by 20, to limit simultaneous calls and it worked better. But I would like to know here why I don't keep successful calls results above that limit. – Dude Pascalou Aug 18 '22 at 20:45
  • @JohnWu: yes, these 2 methods can return null. That's why return type is `Metadata?`. How it can interfere with the task? – Dude Pascalou Aug 18 '22 at 20:48
  • @DudePascalou I'm not sure where the disconnect is. If either of those methods returns null then `TryGetMetadataAsync` will return null and if it is null then it will be skipped over by the loop that populates `metadatas`. Isn't your problem that the list is empty? – John Wu Aug 18 '22 at 21:04
  • @JohnWu Oh, I see what you mean. I forgot to explain that I have the problem with the same starting dataRefs. For instance: the method processes successfully a sequence from "1" to "20" and a sequence from "21" to "40", but for a sequence from "1" to "40", there are 500s returned from the server. – Dude Pascalou Aug 18 '22 at 21:25
  • 3
    I just noticed you are using `Task.WaitAll`. You should probably be using `await Task.WhenAll` instead. You might be blocking the thread until the tasks time out. – John Wu Aug 18 '22 at 22:24
  • @DudePascalou - This is a classic situation for Microsoft's Reactive Framework (aka Rx) to handle. The only issue I have is that the key part of the code is missing for me to suggest how. Can you please provide details of the actual `api` code? All types, etc, to make a [mcve]? – Enigmativity Aug 18 '22 at 23:14
  • Not relevant with the main issue, but I would like to note that instead of `Task>` I would prefer a return type of `Task>` or `Task`. Also you might need to take a look at the [`CancellationTokenSource.CreateLinkedTokenSource`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.cancellationtokensource.createlinkedtokensource) method. – Theodor Zoulias Aug 18 '22 at 23:20

2 Answers2

1

Thanks to @TheodorZoulias' and @JohnWu's comments, the solution was to use the combination of Task.WhenAll and ContinueWith (indeed, possible duplicate with Is it possible to get successful results from a Task.WhenAll when one of the tasks fails?):

Here is a working code (with input split in chunks to preserve API responsiveness):

public new async Task<IEnumerable<Metadata>> GetMetadataAsync(IEnumerable<string> dataRefs, string? userJwt, CancellationToken cancellationToken)
{
    var userId = GetUserIdFromJwt(userJwt);
    var metadatas = new ConcurrentBag<Metadata>();
    // Chunk size to split input sequence:
    const int chunkSize = 13;

    // Split dataRefs in chunks:
    var drChunks = dataRefs.Partition(chunkSize).ToList();
    foreach (var chunk in drChunks)
    {
        var tasks = chunk
            .Where(partDataRef => !string.IsNullOrEmpty(partDataRef))
            .Select(partDataRef => TryGetMetadataAsync(partDataRef, userId, cancellationToken))
            .ToArray();

        try
        {
            // Use a ContinueWith to keep successful results and ignore errors
            await Task.WhenAll(tasks).ContinueWith(t =>
            {
                // Retrieve successful calls results:
                var chunkMetadatas = tasks
                    .Where(t => t.Status == TaskStatus.RanToCompletion)
                    .Where(t => t.Result != null)
                    .Select(t => t.Result)
                    .Cast<Metadata>()
                    .ToList();
                foreach (var metadata in chunkMetadatas) { metadatas.Add(metadata); }

                // Manage faulted or canceled tasks:
                var aggregateExceptions = tasks
                    .Where(t => t.IsFaulted)
                    .Where(t => t.Exception != null)
                    .Select(t => t.Exception) // The Exception is of type AggregateException
                    .Cast<AggregateException>()
                    .ToArray();
                var exceptions = new AggregateException(aggregateExceptions)
                    .Flatten()
                    .InnerExceptions
                    .ToList(); // Flatten the hierarchy of AggregateExceptions
                if (exceptions.Count > 0)
                {
                    // Logging...
                }
                else if (t.IsCanceled)
                {
                    // No exceptions and at least one task was canceled
                    // Logging...
                }
            }, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
        }
        catch (Exception ex)
        {
            // Logging...
        }
    }

    return metadatas;
}
Dude Pascalou
  • 2,989
  • 4
  • 29
  • 34
-1

Try to use await Task.WhenAll instread of Task.WaitAll Like this:

var tasks = dataRefs
    .Where(dataRef => !string.IsNullOrEmpty(dataRef))
    .Select(dataRef => TryGetMetadataAsync(dataRef, userId, cancellationToken))
    .ToArray();

await Task.WhenAll(tasks);

var metadatas = new List<Metadata>();
foreach (var task in tasks)
{
    var metadata = await task;
    if (metadata != null) { metadatas.Add(metadata); }
}

Probably Task.WaitAll just not continue call generated state machine for TryGetMetadataAsync