0

I'm trying to implement parallelization in a project. The objective is to transform a classic ForEach that reads from a List<List<Foo>> and uploads to a database in a Parallale.ForEachAsync. My current implementation is:

var rows = new long();
foreach (var list in listOfLists)
{
  var sb = new StringBuilder("replace into table values ");
  foreach(item in list)
  {
    sp.Append($"({item.bar} ");
  }

  var updateRow = await _client.ExecuteSql(sb.ToString());
  rows += updatedRows;
}

EF Core is not an option here. I need to pass the query as a string to the database, that's why I need to iterate a list of entities and build the query. Also, there is a limit on the query string size, that's why I have to break the original List<Foo> in smaller lists of List<List<Foo>>.

My current implementation is as follows:

var rows = new long();
await Parallel.ForEachAsync(listOfLists, async (list, token) =>
{
  var sb = new StringBuilder("replace into table values ");
  foreach(var item in list)
  {
    sp.Append($"({item.bar} ");
  }

  var updatedRows = await _client.ExecuteSql(sb.ToString());
  rows += updatedRows;
}

But the problem is: when I run this code, It does not work as I expect, which is to do in parallalel, for each List<Foo> in List<List<Foo> the body function. When I ran the code, it flooded the database with duplicated entries.

How do I make a correct implementation of a async Parallel.ForEachAsync having as a <TSource> a List<List<Foo>>?

Edit: I'm using .net 6

lbueno
  • 31
  • 1
  • 8
  • 3
    "_flooded the database with duplicated entries_." Is that because you have duplicate updates in your list-of-lists, or are the updates interdependent on each other (i.e., must actually be executed in sequential order)? Only you can figure this out. It's not the Parallel.ForEachAsync that's the cause of your problem (it might be a contributing trigger, though); it's either your update data, or the DB schema, or the DB itself. Also as a side note, it's a quite unusual and rare sight to see something like `var rows = new long();`. Because typically one would write `long rows = 0;`... –  Oct 04 '22 at 18:45
  • 1
    Please also note that you don't need to demonstrate SQL injection problems for your question - we are not going to stop your own code to have whatever problems you like, but please, for code shown to public avoid unnecessary confusion by skipping SQL injection part - re-read [mre] to see what *should* be in the question. – Alexei Levenkov Oct 04 '22 at 19:07
  • 2
    If the `foreach` version works then I think I'd suspect that the `ParallelForEachAsync` version introduced a race condition. What is `_client` and what does `ExecuteSql` do? That seems like the most likely place you'd run into a race based on what you've shown here. – Joshua Robinson Oct 04 '22 at 19:08
  • 1
    As a side note, the `rows += updatedRows;` line is not thread-safe, because the `+=` operator is not atomic. You can look at [this question](https://stackoverflow.com/questions/19326968/is-c-sharp-thread-safe) for more details. – Theodor Zoulias Oct 04 '22 at 19:58
  • 2
    Thanks for all the comments. The issue really wasn't with the Parallel.ForEachAsync. Actually I created a testing table and forgot to copy the PK, so the replace wasn't working properly. And thanks for the notes regarding the updatedRows, I've improved the code. – lbueno Oct 04 '22 at 20:31
  • And @AlexeiLevenkov also thanks for the input regarding the proper way for asking the question, I'll bear that in mind next time. This project is a basic ETL with no outside input and this the only way I can interact with the database – lbueno Oct 04 '22 at 20:32

0 Answers0