1

In my article site i have a few statistics operations that require DB Updates, i would like to do this asynchronicly with threads.

The problem is when i'm trying to implimate what is suggested here: Simplest way to do a fire and forget method in C#?


ThreadPool.QueueUserWorkItem(o => FireAway());

Because the FireAway() contains database updates i get "the db connection already open" error.

My question is: what is the simplest way to write a thread that will work in the background as an atomic operation without colliding.

My site: www.mentallica.co.il

Added information..... The FireAway() contains a call to a dll that stores all my db updates.

I'm using a shared class called bll MyBuisnessLogic bll = new MyBuisnessLogic()

And inside FireAway()

bll.RunStatistics();

The bll.RunStatistics() function opens and closes the sql connection.

I guess the problem accurs when one thread open the sql connection and another thread trying to open an already open connection.

Maybe i should create a seperate instance for MyBuisnessLogic for the new thread? Maybe i need to do this inside using () ? Somthing like using (MyBuisnessLogic bll = new MyBuisnessLogic) ?

---- after checking i see that MyBuisnessLogic require Idisposible in order to work... should i go that way?

Community
  • 1
  • 1
Mike Tishler
  • 183
  • 10
  • 1
    Can you please post the contents of your `FireAway()` method? The first thing that occurs to me is that you are trying to reopen a connect that is already open. – Adrian Toman Sep 21 '13 at 08:08
  • 1
    It seems that the `BLL` class instance holds a `SqlConnection` for its lifetime. As you say - you would need a separate instance of the BLL on each thread (similar to point 1 in my answer). The BLL would need to be redesigned to be thread-safe. – StuartLC Sep 21 '13 at 12:44

1 Answers1

1

The common reasons why you want non-blocking Db processing would include:

  1. Because each database operation takes a long time
  2. Because there is a lot of data to process and parallelism could improve the throughput. (i.e. we are assuming that the bottleneck won't just be moved to the database)

As your post suggests, management of resources like SqlConnections requires consideration, since sharing SqlConnection or SqlCommand across threads isn't a good idea. Synchronizing access to the SqlConnection isn't desirable as it will undo any parallelisation benefit.

A simple solution to problem 1. would be to force each thread to establish its own SqlConnection, although this is not conducive to high database throughput:

        Task.Factory.StartNew(() =>
        {
            using (var conn = new SqlConnection(connectionString))
            using (var cmd = conn.CreateCommand())
            {
                conn.Open();
                SetupCmd(cmd);
                SaveStat(cmd, statToSave);
            }
        });

Alternatives for background writing exist (case 1.) e.g. by having one or more long-lived writer threads, listening to a queue e.g. ConcurrentQueue or better still, a BlockingCollection iterated by a GetConsumingEnumerable fed by ASP.Net page threads. The writer(s) thread would keep a single SqlConnection open throughout.

In a high volume situation like 2., reuse of SqlConnection and SqlCommands is essential for performance. The data then needs to be partitioned amongst multiple threads (or Tasks, if using the TPL). Parallel.ForEach does most of the hard work for us - below, the overload with localInit is used to establish the SqlConnection and SqlCommand, which then gets passed to each body, and the localFinally is called at the end of the task (after 0..N iterations of the Body - the default partitioner is used, so TPL decides on how many Tasks are needed, and how many items are passed to each Task body). The localInit allows a similar paradigm to using thread-local storage.

One caveat - if the processing is just for high volume insert operations, SqlBulkCopy would likely be a better approach altogether.

Here's a couple of options using the TPL: Given the table:

create table StatsData
(
    x int ,
    y decimal(20,5),
    name nvarchar(50)
)

And a Model:

public class StatsData
{
    public int X { get; private set; }
    public double Y { get; private set; }
    public string Name { get; private set; }
    public StatsData(int x, double y, string name)
    {
        X = x;
        Y = y;
        Name = name;
    }
}

The following class gives 2 asynchronous options (for point 1 and 2):

public class Dispatcher
{
    // Helpers - refactoring
    private static void SetupCmd(SqlCommand cmd)
    {
        cmd.CommandText = "insert into dbo.statsdata(x, y, Name) values (@x, @y, @Name);";
        cmd.CommandType = CommandType.Text;
        cmd.Parameters.Add("@x", SqlDbType.Int);
        cmd.Parameters.Add("@y", SqlDbType.Decimal);
        cmd.Parameters.Add("@Name", SqlDbType.NVarChar, 30);
    }

    private static void SaveStat(SqlCommand cmd, StatsData statToSave)
    {
        cmd.Parameters["@x"].Value = statToSave.X;
        cmd.Parameters["@y"].Value = statToSave.Y;
        cmd.Parameters["@Name"].Value = statToSave.Name;
        cmd.ExecuteNonQuery();
    }

    // 1. Save 1 stat at a time on a background task. Use for low / intermittent volumes 
    public void SaveStatAsynch(string connectionString, StatsData statToSave)
    {
        Task.Factory.StartNew(() =>
        {
            using (var conn = new SqlConnection(connectionString))
            using (var cmd = conn.CreateCommand())
            {
                conn.Open();
                SetupCmd(cmd);
                SaveStat(cmd, statToSave);
            }
        });
    }

    // 2. For background writing of large volumes of stats. Uses the default partitioner in parallel foreach
    public void SaveStatsParallel(string connectionString, IEnumerable<StatsData> statsToSave)
    {
        Parallel.ForEach(
            statsToSave,
            // localInit. Return value is passed to each body invocation
            () =>
                {
                    var conn = new SqlConnection(connectionString);
                    var cmd = conn.CreateCommand();
                    SetupCmd(cmd);
                    conn.Open();
                    return new
                    {
                        Conn = conn,
                        Cmd = cmd
                    };
                },
            // Body, 0..N per Task decided by TPL
            (stat, loopState, initData) =>
                {
                    SaveStat(initData.Cmd, stat);
                    return initData;
                },
            // Disposables
            (initData) =>
                {
                    initData.Cmd.Dispose();
                    initData.Conn.Dispose();
                }
            );
    }

An example of usage:

        const string connString = @"Server=.\SqlExpress;DataBase=StatsDb;Integrated Security=true";
        // Create some dummy data
        var statsToSave = 
            Enumerable
                .Range(0, 10000)
                .Select(i => new StatsData(i, i*Math.PI, string.Format("Stat #{0}", i)));
        // Insert this in parallel on background tasks / threads as determined by the TPL
        new Dispatcher().SaveStatsParallel(connString, statsToSave);
Community
  • 1
  • 1
StuartLC
  • 104,537
  • 17
  • 209
  • 285