The common reasons why you want non-blocking Db processing would include:
- Because each database operation takes a long time
- Because there is a lot of data to process and parallelism could improve the throughput. (i.e. we are assuming that the bottleneck won't just be moved to the database)
As your post suggests, management of resources like SqlConnections
requires consideration, since sharing SqlConnection
or SqlCommand
across threads isn't a good idea. Synchronizing access to the SqlConnection
isn't desirable as it will undo any parallelisation benefit.
A simple solution to problem 1. would be to force each thread to establish its own SqlConnection
, although this is not conducive to high database throughput:
Task.Factory.StartNew(() =>
{
using (var conn = new SqlConnection(connectionString))
using (var cmd = conn.CreateCommand())
{
conn.Open();
SetupCmd(cmd);
SaveStat(cmd, statToSave);
}
});
Alternatives for background writing exist (case 1.) e.g. by having one or more long-lived writer threads, listening to a queue e.g. ConcurrentQueue
or better still, a BlockingCollection
iterated by a GetConsumingEnumerable
fed by ASP.Net page threads. The writer(s) thread would keep a single SqlConnection
open throughout.
In a high volume situation like 2., reuse of SqlConnection
and SqlCommands
is essential for performance. The data then needs to be partitioned amongst multiple threads (or Tasks, if using the TPL). Parallel.ForEach does most of the hard work for us - below, the overload with localInit
is used to establish the SqlConnection
and SqlCommand
, which then gets passed to each body, and the localFinally
is called at the end of the task (after 0..N iterations of the Body - the default partitioner is used, so TPL decides on how many Tasks are needed, and how many items are passed to each Task body). The localInit
allows a similar paradigm to using thread-local storage.
One caveat - if the processing is just for high volume insert operations, SqlBulkCopy would likely be a better approach altogether.
Here's a couple of options using the TPL:
Given the table:
create table StatsData
(
x int ,
y decimal(20,5),
name nvarchar(50)
)
And a Model:
public class StatsData
{
public int X { get; private set; }
public double Y { get; private set; }
public string Name { get; private set; }
public StatsData(int x, double y, string name)
{
X = x;
Y = y;
Name = name;
}
}
The following class gives 2 asynchronous options (for point 1 and 2):
public class Dispatcher
{
// Helpers - refactoring
private static void SetupCmd(SqlCommand cmd)
{
cmd.CommandText = "insert into dbo.statsdata(x, y, Name) values (@x, @y, @Name);";
cmd.CommandType = CommandType.Text;
cmd.Parameters.Add("@x", SqlDbType.Int);
cmd.Parameters.Add("@y", SqlDbType.Decimal);
cmd.Parameters.Add("@Name", SqlDbType.NVarChar, 30);
}
private static void SaveStat(SqlCommand cmd, StatsData statToSave)
{
cmd.Parameters["@x"].Value = statToSave.X;
cmd.Parameters["@y"].Value = statToSave.Y;
cmd.Parameters["@Name"].Value = statToSave.Name;
cmd.ExecuteNonQuery();
}
// 1. Save 1 stat at a time on a background task. Use for low / intermittent volumes
public void SaveStatAsynch(string connectionString, StatsData statToSave)
{
Task.Factory.StartNew(() =>
{
using (var conn = new SqlConnection(connectionString))
using (var cmd = conn.CreateCommand())
{
conn.Open();
SetupCmd(cmd);
SaveStat(cmd, statToSave);
}
});
}
// 2. For background writing of large volumes of stats. Uses the default partitioner in parallel foreach
public void SaveStatsParallel(string connectionString, IEnumerable<StatsData> statsToSave)
{
Parallel.ForEach(
statsToSave,
// localInit. Return value is passed to each body invocation
() =>
{
var conn = new SqlConnection(connectionString);
var cmd = conn.CreateCommand();
SetupCmd(cmd);
conn.Open();
return new
{
Conn = conn,
Cmd = cmd
};
},
// Body, 0..N per Task decided by TPL
(stat, loopState, initData) =>
{
SaveStat(initData.Cmd, stat);
return initData;
},
// Disposables
(initData) =>
{
initData.Cmd.Dispose();
initData.Conn.Dispose();
}
);
}
An example of usage:
const string connString = @"Server=.\SqlExpress;DataBase=StatsDb;Integrated Security=true";
// Create some dummy data
var statsToSave =
Enumerable
.Range(0, 10000)
.Select(i => new StatsData(i, i*Math.PI, string.Format("Stat #{0}", i)));
// Insert this in parallel on background tasks / threads as determined by the TPL
new Dispatcher().SaveStatsParallel(connString, statsToSave);