0

I am working on a worker service. In this worker service, I am calling a web API of mine that calls an external API. My web API is an asp.net core web API saves requests and responses into database. This external web API returns online game codes.(I am sending how many game codes and for which game.) In a single request there are maximum of 10 codes. The user enters the total amount of online codes and the game number from a blazor server web application. Behind the scenes, this total amount of codes are divided into 20 codes each and are saved into database. The reason why it is saved as 20 is because I am sending 2 parallel tasks to the web API in Parallel.ForEachAsync in my worker service. This worker service checks the database if there is a record which status = 0, it gets this record and starts the process.

For example;

Below are 20 coupon requests waiting to be processed in the screenshot. When the worker service receives this record (status = 0) from the database and sends it to my ASP.NET Core web API, then the external API is called 10 by 10 from my API. (Because I want the maximum number of game codes to receive in 1 request.) Therefore, in this scenario, 2 requests will be sent parallel, and 2 responses will be returned. The reason why I make 2 tasks with Parallel.ForEachAsync in the worker is to speed up the process a little more because sometimes a total of 20.000 game codes are requested.

enter image description here

Here is a portion of worker service where I am calling my API. If the responses are a success, I am updating the status = 1 to the database table above saying that this job is completed. In this particular scenario, if both threads/tasks return success there is no problem. The row in the database will be updated, status = 1. If both threads are failed again no problem. (still status = 0, worker service will try it again after a random time interval) But if one thread/task is a success and the other one is failed, the update data method still updates the record which is wrong logic. 1 successful response was received from the external API and saved in the database, but the update method in the worker service updated the record as status=1. Only 10 game codes have been received. Since there is no logic for checking each task's status, this is causing a problem because only one task ran successfully so only 10 codes were returned. But the database has been updated for 20 codes due to the fact that the task states are not checked separately.

var num = 20;
var firstNum = 10;
var secondNum = 10;

if (num < 20)
{
    firstNum = (num + 1) / 2;
    secondNum = num - firstNum;
}

var quantities = new List<int> { firstNum, secondNum };
var cts = new CancellationTokenSource();
ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 2,
    CancellationToken = cts.Token
};
try
{
    await Parallel.ForEachAsync(quantities, parallelOptions, 
       async (quantity, ct) =>
    {
        var content = new FormUrlEncodedContent(new[]
        {
            new KeyValuePair<string, string>("productCode", productCode),
            new KeyValuePair<string, string>("quantity", quantity.ToString()),
            new KeyValuePair<string, string>("clientTrxRef", bulkId.ToString())
        });
        try
        {
            using var response =
                   await httpClient.PostAsync(_configuration["Razer:Production"], 
       content, ct);

            if ((int)response.StatusCode == 200)
            {
                var coupon = await response.Content.ReadFromJsonAsync<Root>(cancellationToken: ct);

                await UpdateData(id);
            }
            else
            {
                //logging
            }
        }
        catch (HttpRequestException ex)
        {
            //logging
        }
                           
    });
}
catch (OperationCanceledException ex)
{
    //logging
}

Is there a way to get the statuses of each task so that I can build logic for my problem?

Cenk
  • 129
  • 3
  • 15
  • So you are using the `Parallel.ForEachAsync` method to launch just two concurrent asynchronous operations? This is a quite atypical use of this API, I would say. – Theodor Zoulias Feb 14 '23 at 09:22
  • I didn't figure out how to solve the update problem. If the update job was out of the equation, I would try to increase the max concurrent asynchronous operations. Is max parallelism depends on the server memory? – Cenk Feb 14 '23 at 09:35
  • Regarding the degree of parallelism, check out these two questions: [Factors for determining the degree of parallelism for the ForEachAsync](https://stackoverflow.com/questions/34359509/factors-for-determining-the-degree-of-parallelism-for-the-foreachasync/), and [Actual maximum concurrent tasks of Parallel.ForEachAsync](https://stackoverflow.com/questions/72447990/actual-maximum-concurrent-tasks-of-parallel-foreachasync). – Theodor Zoulias Feb 14 '23 at 10:14
  • 1
    IMHO there is too much code in the question, that makes it not very "tasty" for people who are here to share their knowledge, or use it to solve puzzles. If you want to make your question more appetizing, you should minimize it as much as possible, remove all this tedious instrumentation of nested `try`/`catch` blocks and logging, and present the problem in its purest form. Stylistically it is appreciated if the lines of code are short enough so that the horizontal scroll bar does not appear. Having to scroll both horizontally and vertically make code highly unreadable on this site. – Theodor Zoulias Apr 07 '23 at 11:34
  • @TheodorZoulias thank you for your comment and sorry for the complexity. I will try to make it simpler and appetizing. Just trying to clarify the design and close the gaps. Is there a way to get the statuses of tasks so that I can build logic for my problem? – Cenk Apr 07 '23 at 16:12
  • I am trying to figure out how to check each task's status so that I can write a logic for updating data. In this case, if both tasks do not run successfully, an incorrect update is made. – Cenk Apr 08 '23 at 05:15
  • I don't see where you are updating the database. I suspect that the solution would be to move that code into the `ForEachAsync`. – Stephen Cleary Apr 08 '23 at 23:43
  • It is `await UpdateData(id);`, what do you mean by moving that code into the `ForEachAsync`? – Cenk Apr 09 '23 at 04:38

1 Answers1

1

I think you might resolve the problem by doing all the work in a transaction of sorts. Basically, retreive all the data first and only if all the requests succeed, save all the data to the database. Otherwise, exit and save nothing.

Does this help?

private async Task RetreiveAndSaveCouponsAsync()
{
    // code was ommited here because you haven't shown all of your solution

    var quantities = new List<int> { 10, 10 };

    // create a task that retreives coupones for each quantity
    var tasks = quantities
        .Select(quantity => GetCodesAsync("someProductCode", quantity, 123, ct))
        .ToList();

    try
    {
        // await all the tasks
        await Task.WhenAll(tasks);
    }
    catch
    {
        // extract the exceptions from the tasks
        var exceptions = tasks
            .Where(x => x.Exception is not null)
            .Select(x => x.Exception);

        // log exceptions
        foreach (var ex in exceptions)
        {
            _logger.LogError(ex);
        }

        return;
    }

    // check if any of them failed
    if (tasks.Any(x => x.Result is null))
    {
        // one or more tasks failed, log error and don't save anything into db
        _logger.LogError("one or more tasks failed");
        return;
    }

    // extract the coupons from the tasks
    IEnumerable<Root> coupons = tasks.Select(x => x.Result).Cast<Root>();

    // save all coupons to the database
    foreach (var coupon in coupons)
    {
        await SaveCouponToDatabaseAsync(coupon);
    }
}

private async Task<Root?> GetCodesAsync(string productCode, int quantity, int bulkId, CancellationToken ct)
{
    var httpClient = new HttpClient(); // realistically, you'd get the HttpClient from IHttpClientFactory here

    var content = new FormUrlEncodedContent(new[]
            {
            new KeyValuePair<string, string>("productCode", productCode),
            new KeyValuePair<string, string>("quantity", quantity.ToString()),
            new KeyValuePair<string, string>("clientTrxRef", bulkId.ToString())
        });

    using var response = await httpClient.PostAsync(_configuration["Razer:ProductionMock"], content, ct);

    if ((int)response.StatusCode == 200)
    {
        var coupon = await response.Content.ReadFromJsonAsync<Root>(cancellationToken: ct);
        _logger.LogInformation("REFERENCE ID: {referenceId}", coupon.ReferenceId);
        return coupon;
    }

    _logger.LogError("Purchase ServiceError: {statusCode}", (int)response.StatusCode);
    return null; // I'm returning null in case the request fail, you should decide how to handle this problem in your app
}
Michal Diviš
  • 2,008
  • 12
  • 19
  • Thank you, Michal. My problem is this worker sample code I posted, just triggers my API and all of the operations saving requests/responses and coupons to the database are made in the API. I expect the Worker to continue to run randomly in the background and trigger to get the total game code from the API, and when these processes are completed, it will update as in the previous post. Here, the critical point for me is to update correctly and completely according to the responses returned from the API. (status = 1) Let me add the whole worker service code. – Cenk Feb 15 '23 at 06:26
  • Thank you for the clarification. Are you able to call the external API again to receive the same game codes you received in the past? If yes, my method might be applicable. In case any of the requests fail, you start over and send all of them again. However, If that is not the case and you can only request the same game codes once then you might be stuck with only processing 10 codes at a time to achieve consistency. – Michal Diviš Feb 15 '23 at 12:18
  • I am sorry but I don't understand how your code achieves consistency if there is a request failed. Let me give you an example; I wanted to purchase 100 codes for game X. Send this request to the API. External API started to process. Processed 7 of them which means 70 codes. 3 of them failed. 70 codes of game X written into the database in the API. Since 3 of them failed, there is no `status=1` update to the table. So the worker service most probably will try to process this request sooner or later. – Cenk Feb 16 '23 at 12:27
  • (`GetData` method) And when it processes that request again and if there is no failure, 100 codes will be purchased and written to the database. This means I wanted 100 codes for game X but got 170 instead. Is it clearer now? – Cenk Feb 16 '23 at 12:27