I have a database table that acts as a task queue. Inserts into this table happen in other places (e.g. Table Triggers, etc.).
I have a multi-threaded Windows Service that does the following:
- Using a Stored Procedure, reads the queue table for rows marked as
'todo'
, and marks the rows it read as'doing'
. - Processes those rows.
- Marks the rows as
'done'
.
Ideal Situation:
When the service is running, multiple processors/threads will process a unique batch of myqueue
rows.
Actual:
There are times when multiple processors/threads retrieve the same myqueue
rows.
This means a myqueue
row can be processed multiple times.
Question: Is there a concurrency problem I'm missing here?
T-SQL Code:
CREATE TABLE myqueue (
id varchar(20) NOT NULL,
task_data varchar(255) NULL,
status varchar(10) NULL,
PRIMARY KEY CLUSTERED ( id ASC )
CREATE PROCEDURE dbo.getNextInQueue @itemsToGet INT = 1000
AS
BEGIN
IF OBJECT_ID('tempdb..#tmpnext') IS NULL
BEGIN
CREATE TABLE #tmpnext (
id VARCHAR(20)
,PRIMARY KEY CLUSTERED (id)
)
END
INSERT INTO #tmpnext (id)
SELECT TOP (@itemsToGet) id
FROM myqueue WITH (
UPDLOCK
,READPAST
)
WHERE status = 'todo'
UPDATE m
SET m.status = 'doing'
FROM (
SELECT id
FROM #tmpnext
) AS tmp
INNER JOIN myqueue m ON m.id = tmp.id
SELECT m.*
FROM myqueue m WITH (NOLOCK)
JOIN #tmpnext t ON t.id = m.id
END
C# Code:
public class QueueProcessorService : System.ServiceProcess.ServiceBase
{
private System.Threading.Timer IntervalTimer;
private ServiceParams serviceParams; // A POCO that reads arguments, returns defaults if not provided.
private bool isStopped = false;
private readonly object _mutex = new object();
private int concurrentRunningProcessors = 0;
private string ConnectionString => ConfigurationManager.AppSettings["ConnectionString"];
protected override void OnStart(string[] args)
{
serviceParams = new ServiceParams(args);
IntervalTimer = new System.Threading.Timer(
callback: new System.Threading.TimerCallback(IntervalTimer_ElapsedAsync),
state: null,
dueTime: serviceParams.PeriodInMillisec,
period: serviceParams.PeriodInMillisec);
}
protected override void OnStop()
{
isStopped = true;
IntervalTimer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
while (concurrentRunningProcessors > 0)
{
// Wait until running processors finish
};
IntervalTimer.Dispose();
IntervalTimer = null;
}
private async void IntervalTimer_ElapsedAsync(object state)
{
try
{
if (!isStopped && concurrentRunningProcessors <= serviceParams.MaxConcurrentProcessors)
{
lock (_mutex)
{
concurrentRunningProcessors++;
}
try
{
await ProcessTasksInQueue();
}
catch (Exception)
{
throw;
}
finally
{
if (concurrentRunningProcessors > 0)
{
lock (_mutex)
{
concurrentRunningProcessors--;
}
}
}
}
}
catch (Exception ex)
{
DealWithException(ex); // e.g. Logs, etc.
this.Stop();
}
}
private async Task ProcessTasksInQueue()
{
using (var conn = new SqlConnection(ConnectionString))
{
var cmd = conn.CreateCommand();
cmd.CommandText = "dbo.getNextInQueue";
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.Add(new SqlParameter("@itemsToGet", SqlDbType.Int) { Value = serviceParams.ItemsToGet });
var rowsRetrieved = new List<RowInTaskQueue>();
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
var row = new RowInTaskQueue()
{
Id = reader.GetString(reader.GetOrdinal("id")),
Data = reader.GetString(reader.GetOrdinal("task_data")),
Status = reader.GetString(reader.GetOrdinal("status")),
};
rowsRetrieved.Add(row);
}
}
ProcessTheRows(rowsRetrieved);
}
}
}