-1

I have a table with over 100000 records. I want to read the data, process them, and then delete the row if the process is successful, and update the row with an error code if it fails.

My approach is to get 1000 records at a time, load it to a datatable, pass this to a function that runs the process in 5 parallel threads. Once the first 1000 records are processed, open a new DataReader and work on the next 1000...and so on...

Does my approach look right? The issue I am having is that the code runs fine, but the 1000 records are not completely processed. About 300 records are processed and the rest remain. What am I doing wrong here?

 using (SqlDataReader rdr = cmd.ExecuteReader) {
    if (rdr.HasRows) {
        Datatable dt = new Datatable();
        dt.Load(rdr);
        //process data
        results = Process(dt);
    }
}

public List<string> Process(Datatable dt) {
    var options = new ParallelOptions() {
    MaxDegreeOfParallelism = 5
    }
    List<string> results = new List<string>();
    
    Parallel.ForEach(dt.Rows.Cast<DataRow>(), options, (trans) => {
        //process and then delete/update row
        ....
        results.Add(transResult);
    });
    return results
}
Jay
  • 329
  • 1
  • 6
  • 22
  • 2
    It's not safe to call `Add` on `results` from multiple threads at the same time. Either use `ConcurrentBag` and call `ToList()` once the parallel jobs are finished, or wrap the `Add` with a `lock` statement – Andrew Williamson Feb 03 '21 at 20:03
  • The approach is wrong for several reasons. `Parallel.ForEach` is **only** meant for crunching large amounts of in-memory data using all available cores, *NOT* for running asynchronous IO operations in parallel. Whatever `process and then delete/update row` does, at some point it's going to talk to the database, one slow row at a time, blocking a CPU core while waiting for the database to respond – Panagiotis Kanavos Feb 03 '21 at 20:03
  • What kind of processing do you perform? It matters. You may be able to use Dataflow classes like TransformBlock, ActionBlock and BatchBlock to read the source data in a stream, process each row and *batch the results* before writing them to the database in batches, *avoiding* the network roundtrips required for individual row modificatitons – Panagiotis Kanavos Feb 03 '21 at 20:07
  • What processing are you doing? Can it all be done in SQL? @PanagiotisKanavos There is no IO done in the lambda, it's working on an in-memory `DataTable` – Charlieface Feb 03 '21 at 20:10
  • 1
    `//process and then delete/update row` <== Could you provide more info about the type of processing each `DataRow` undergoes? Is it [I/O-bound or CPU-bound](https://stackoverflow.com/questions/868568/what-do-the-terms-cpu-bound-and-i-o-bound-mean)? – Theodor Zoulias Feb 03 '21 at 20:10

1 Answers1

-1

Threads share same resources so it would cause issue from dbContext. so better to do lock in Parallel.ForEach as

    Parallel.ForEach(dt.Rows.Cast<DataRow>(), options, (trans) => {
        var obj=new object();
    lock(obj)
            {
            //...your processing
            }
        });
SIbghat
  • 281
  • 3
  • 5
  • The creation of the locking object must be taken out of the loop. Then it should work, but it may have a big impact on performance. – Alexander Petrov Feb 04 '21 at 13:14