20

I like the simplicity of the Parallel.For and Parallel.ForEach extension methods in the TPL. I was wondering if there was a way to take advantage of something similar or even with the slightly more advanced Tasks.

Below is a typical usage for the SqlDataReader, and I was wondering if it was possible and if so how to replace the while loop below with something in the TPL. Because the reader can't provide a fixed number of iterations the For extension method is not possible which leaves dealing with Tasks I would gather. I was hoping someone may have tackled this already and worked out some do's and don''s with ADO.net.

using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
{
    conn.Open();

    SqlDataReader reader = comm.ExecuteReader();

    if (reader.HasRows)
    {
        while (reader.Read())
        {
            // Do something with Reader
        }
    }
}
BlackWasp
  • 4,933
  • 2
  • 30
  • 42
Rodney S. Foley
  • 10,190
  • 12
  • 48
  • 66

2 Answers2

26

You're going to have difficulty replacing that while loop directly. SqlDataReader is not a thread safe class, so you cannot use it directly from multiple threads.

That being said, you could potentially process the data you read using the TPL. There are a few options, here. The easiest might be to make your own IEnumerable<T> implementation that works on the reader, and returns a class or struct containing your data. You could then use PLINQ or a Parallel.ForEach statement to process your data in parallel:

public IEnumerable<MyDataClass> ReadData()
{
    using (SqlConnection conn = new SqlConnection("myConnString"))
    using (SqlCommand comm = new SqlCommand("myQuery", conn))
    {
        conn.Open();

        SqlDataReader reader = comm.ExecuteReader();

        if (reader.HasRows)
        {
            while (reader.Read())
            {
                yield return new MyDataClass(... data from reader ...);
            }
        }
    }
}

Once you have that method, you can process this directly, via PLINQ or TPL:

Parallel.ForEach(this.ReadData(), data =>
{
    // Use the data here...
});

Or:

this.ReadData().AsParallel().ForAll(data => 
{
    // Use the data here...
});
Reed Copsey
  • 554,122
  • 78
  • 1,158
  • 1,373
20

You're almost there. Wrap the code you posted in a function with this signature:

IEnumerable<IDataRecord> MyQuery()

and then replace your // Do something with Reader code with this:

yield return reader;

Now you have something that works in a single thread. Unfortunately, as you read through the query results it's return a reference to the same object each time, and the object just mutates itself for each iteration. This means that if you try to run it in parallel you'll get some really odd results as parallel reads mutate the object used in different threads. You need code to take a copy of the record to send to your parallel loop.

At this point, though, what I like to do is skip the extra copy of the record and go straight to a strongly-typed class. More than that, I like to use a generic method to do it:

IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
{
    using (var cn = new SqlConnection("My connection string"))
    using (var cmd = new SqlCommand(sql, cn))
    {
        addParameters(cmd.Parameters);

        cn.Open();
        using (var rdr = cmd.ExecuteReader())
        {
            while (rdr.Read())
            {
                yield return factory(rdr);
            }
        }
    }
}

Assuming your factory methods create a copy as expected, this code should be safe to use in a Parallel.ForEach loop. Calling the method would look something like this (assuming a an Employee class with a static factory method named "Create"):

var UnderPaid = GetData<Employee>(Employee.Create, 
       "SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary", 
       p => {
           p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
       });
Parallel.ForEach(UnderPaid, e => e.GiveRaise());

Important Update:
I'm not as confident in this code as I once was. A separate thread could still mutate the reader while another thread is in the process of making it's copy. I could put a lock around that, but I'm also concerned that another thread could call update the reader after the original has itself called Read() but before it begins to make the copy. Therefore, the critical section here consists of the entire while loop... and at this point, you're back to single-threaded again. I expect there is a way to modify this code to work as expected for multi-threaded scenarios, but it will need more study.

Joel Coehoorn
  • 399,467
  • 113
  • 570
  • 794
  • I am with you for most of what you said, you lost me a little bit on the factory. Func factory doesn't match the call when used with yeild return factor(rdr) I think you meant Func. So not sure what you mean by copy as expected. Do you mean do basically read from the reader and return a MyDataClass similar to what Reed was saying in his response? – Rodney S. Foley Jun 22 '10 at 23:10
  • Also looks like your GetData call is our of order you have the factory func before the sql string. Regardless I think I get it, your Employee.Create is your factory that does the work needed with the reader. I will play with this for a little while and see how it goes. – Rodney S. Foley Jun 22 '10 at 23:25
  • Yes, I meant Func. Will fix that and the parameter mis-match. – Joel Coehoorn Jun 22 '10 at 23:43
  • It worked great. Thanks for the help, and I love the GetData method which works great with or without threading. – Rodney S. Foley Jun 23 '10 at 02:09
  • 4
    Some nice functional code here, but as to the benefits of running this in parallel, I'm not sure there are any. The bottleneck is likely to be the actual db call, which is not run in parallel. – Robert Jeppesen Sep 29 '10 at 20:32
  • @RobertJeppesen: That depends upon the details of the implementation of `GiveRaise`. For instance, what if `GiveRaise` performed some sort of expensive I/O? – TMcManemy Jul 01 '13 at 17:57
  • @JoelCoehoorn - In the concerns given in your update, will the lock taken by the Parallel.ForEach when enumerating not guard against these scenarios? – dugas Apr 01 '14 at 22:55