1

I have a file with around 8000 employee records that I need to process by calling an rest API for each record. The sequential API calls are taking a lot of time, so I want to call them asynchronously in tasks and wait for all tasks to finish. I plan to have three tasks running at a time.

I have written the following code, but I'm concerned about race conditions or multi-threading problems since I am updating the employee entity inside the task. My understanding is I can update the entities but cannot call dbcontext methods. I know that the DBContext is not thread safe, so I am calling SaveChanges outside of the task loop. Can anyone review my code and let me know if I'm doing it right? Here's my pseudo code:

private async TempMethod()
{
    var dbcontext = new DBContext();
    var employees = dbcontext.Employees.ToList();

    var allTasks = new List<Task<APIResult>();

    var throttler = new SemaphoreSlim(initialCount: 3);

    foreach (var employee in employees)
    {
        await throttler.WaitAsync();
        allTasks.Add(
                       Task.Run(async () =>
                       {
                           try
                           {
                               var apiResult = await apiClient.Update(employee);
                               if (apiResult == "Success")
                               {
                                   employee.lastupdatedby = "Importer";
                               }

                               apiResult.recordNumber = employee.recordNumber;
                               return apiResult;
                           }
                           finally
                           {
                                   throttler.Release();
                           }
                       }
                       );
    }

    var results = await Task.WhenAll(allTasks);

    foreach (var result in results)
    {
        dbcontext.APIResults.Add(result);
    }

    //Save both Updated Employee and the Result entitities.
    dbcontext.SaveChangesAsync();
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
ICICI81
  • 101
  • 1
  • 9
  • 1
    I don't see a wait call on the semaphore. Only release? – Fildor Mar 19 '23 at 20:57
  • And i think, I just would split the input collection in 3 parts and have 3 concurrently running tasks sequentially make the calls each for one of those partitions... – Fildor Mar 19 '23 at 21:00
  • likmiting to three like that is not good, you are going to get a large number of threads stalled waiting on that semaphore. I would use ParallelForEAch with a limit of 3 – pm100 Mar 19 '23 at 21:00
  • @Fildor I added the wait on the semaphore now. It was just a pseudo code. Any concern on updating the employee entity inside the task. There won't be any multi-threading issue here right? – ICICI81 Mar 19 '23 at 21:03
  • Not 100% sure. But that should be easily testable. – Fildor Mar 19 '23 at 21:06
  • @pm100 "If the work you have is I/O-bound, use async and await without Task.Run. You should not use the Task Parallel Library." I saw this here : https://learn.microsoft.com/en-us/dotnet/csharp/asynchronous-programming/async-scenarios – ICICI81 Mar 19 '23 at 21:06
  • The last part about TPL may be in parts outdated in newer versions of dotnet. – Fildor Mar 19 '23 at 21:09
  • I could also imagine a solution with an ActionBlock from DataFlow which is technically also TPL. – Fildor Mar 19 '23 at 21:14
  • It maybe also worthwhile to investigate if the API offers bulk or batch endpoints, so you can reduce the number of calls. – Fildor Mar 19 '23 at 21:29
  • What .NET platform are you targeting? .NET 7? – Theodor Zoulias Mar 19 '23 at 21:32
  • If you are targeting .net 6.0 or higher you can use `Parallel.ForEachAsync` see [here](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync?view=net-7.0) it also supports limiting max concurrency – Selmir Aljic Mar 19 '23 at 21:47
  • Unfortunately this code is targeted for .Net 4.6.1 and the API I’m using doesn’t offer batch calls. – ICICI81 Mar 19 '23 at 21:56

1 Answers1

0

Your code seems right to me, under these conditions:

  1. The employee.lastupdatedby and apiResult.recordNumber are either public fields, or trivial properties backed by private fields (without side-effects).
  2. The class of which the apiClient is an instance, is thread-safe.
  3. You want to process all the employees anyway, even in case of an early exception. In other words you don't want to complete ASAP in case of errors.
  4. In case of an exception you are OK with only a part of the employees having their lastupdatedby updated.
  5. In case of multiple exceptions you are OK with propagating only one of the exceptions (the exception of the first employee in the employees list that failed).

As a side note, personally I would prefer to abstract the parallelization/throttling functionality in a separate helper method, instead of mixing threading and TPL mechanisms with my application code. I would like to link to a good quality implementation of a ForEachAsync that returns results, and is compatible with .NET 4.6.1, but I can't find any. Jon Skeet's implementation here is decent, but it doesn't have ideal behavior in case of exceptions, and it doesn't preserve the order of the results.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104