0

I have a database table that acts as a task queue. Inserts into this table happen in other places (e.g. Table Triggers, etc.).

I have a multi-threaded Windows Service that does the following:

  1. Using a Stored Procedure, reads the queue table for rows marked as 'todo', and marks the rows it read as 'doing'.
  2. Processes those rows.
  3. Marks the rows as 'done'.

Ideal Situation: When the service is running, multiple processors/threads will process a unique batch of myqueue rows.

Actual: There are times when multiple processors/threads retrieve the same myqueue rows. This means a myqueue row can be processed multiple times.

Question: Is there a concurrency problem I'm missing here?

T-SQL Code:

CREATE TABLE myqueue (
    id varchar(20) NOT NULL,
    task_data varchar(255) NULL,
    status varchar(10) NULL,
PRIMARY KEY CLUSTERED ( id ASC )

CREATE PROCEDURE dbo.getNextInQueue @itemsToGet INT = 1000
AS
BEGIN
    IF OBJECT_ID('tempdb..#tmpnext') IS NULL
    BEGIN
        CREATE TABLE #tmpnext (
            id VARCHAR(20)
            ,PRIMARY KEY CLUSTERED (id)
            )
    END

    INSERT INTO #tmpnext (id)
    SELECT TOP (@itemsToGet) id
    FROM myqueue WITH (
            UPDLOCK
            ,READPAST
            )
    WHERE status = 'todo'

    UPDATE m
    SET m.status = 'doing'
    FROM (
        SELECT id
        FROM #tmpnext
        ) AS tmp
    INNER JOIN myqueue m ON m.id = tmp.id

    SELECT m.*
    FROM myqueue m WITH (NOLOCK)
    JOIN #tmpnext t ON t.id = m.id
END

C# Code:

public class QueueProcessorService : System.ServiceProcess.ServiceBase
{
    private System.Threading.Timer IntervalTimer;
    private ServiceParams serviceParams; // A POCO that reads arguments, returns defaults if not provided.
    private bool isStopped = false;
    
    private readonly object _mutex = new object();
    private int concurrentRunningProcessors = 0;
    
    private string ConnectionString => ConfigurationManager.AppSettings["ConnectionString"];
    
    protected override void OnStart(string[] args)
    {
        serviceParams = new ServiceParams(args); 
        
        IntervalTimer = new System.Threading.Timer(
                    callback: new System.Threading.TimerCallback(IntervalTimer_ElapsedAsync),
                    state: null,
                    dueTime: serviceParams.PeriodInMillisec,
                    period: serviceParams.PeriodInMillisec);
    }
    
    protected override void OnStop()
    {
        isStopped = true;
        IntervalTimer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
        
        while (concurrentRunningProcessors > 0)
        {
            // Wait until running processors finish
        };

        IntervalTimer.Dispose();
        IntervalTimer = null;
    }
    
    private async void IntervalTimer_ElapsedAsync(object state)
    {
        try
        {
            if (!isStopped && concurrentRunningProcessors <= serviceParams.MaxConcurrentProcessors)
            {
                lock (_mutex)
                {
                    concurrentRunningProcessors++;
                }
                try
                {
                    await ProcessTasksInQueue();
                }
                catch (Exception)
                {
                    throw;
                }
                finally
                {
                    if (concurrentRunningProcessors > 0)
                    {
                        lock (_mutex)
                        {
                            concurrentRunningProcessors--;
                        }
                    }
                }
            }
        }
        catch (Exception ex)
        {
            DealWithException(ex); // e.g. Logs, etc.
            this.Stop();
        }
    }
    
    private async Task ProcessTasksInQueue()
    {
        using (var conn = new SqlConnection(ConnectionString))
        {
            var cmd = conn.CreateCommand();
            cmd.CommandText = "dbo.getNextInQueue";
            cmd.CommandType = CommandType.StoredProcedure;
            cmd.Parameters.Add(new SqlParameter("@itemsToGet", SqlDbType.Int) { Value = serviceParams.ItemsToGet });
            
            var rowsRetrieved = new List<RowInTaskQueue>();
            
            using (var reader = cmd.ExecuteReader())
            {
                while (reader.Read())
                {
                    var row = new RowInTaskQueue()
                    {
                        Id = reader.GetString(reader.GetOrdinal("id")),
                        Data = reader.GetString(reader.GetOrdinal("task_data")),
                        Status = reader.GetString(reader.GetOrdinal("status")),
                    };
                    rowsRetrieved.Add(row);
                }
            }
            
            ProcessTheRows(rowsRetrieved);
        }
    }
}
Dale K
  • 25,246
  • 15
  • 42
  • 71
janonimus
  • 1,911
  • 2
  • 11
  • 11
  • There are many ways to achieve this, though the simplest approach would be DB concurrency with [TimeStamp] (aka rowversion) even if multiple threads read the same task, marking the task as in progress should be enough to kick the other threads out (for lack of better words). – TheGeneral Aug 04 '20 at 02:11
  • @TheGeneral But how different is that with what I'm doing with the ```status``` field now? – janonimus Aug 04 '20 at 02:16
  • https://www.red-gate.com/simple-talk/sql/t-sql-programming/row-versioning-concurrency-in-sql-server/ – TheGeneral Aug 04 '20 at 02:31
  • 3
    You might want to change the SP to do an `update` with an [`output` clause](https://learn.microsoft.com/en-us/sql/t-sql/queries/output-clause-transact-sql?view=sql-server-ver15). That will let you mark rows as `'doing'` and copy the data in a single operation so that you don't need any additional lock hints or transaction. – HABO Aug 04 '20 at 03:05
  • 5
    Rather than `INSERT` then `UPDATE`, do an `UPDATE` with an OUTPUT clause (to see which records where marked as `doing`). This will likely remove the need for `tmpnext` as well. The problem is your `UPDLOCK` scope is for the INSERT alone - not scoped to the whole stored proc. So two people can take the first entry off the top of the queue. Using `UPDATE` with an OUTPUT clause will get rid of that possibility. The final SQL will be so simple you may as well not use a stored proc either. – mjwills Aug 04 '20 at 03:08
  • @HABO @mjwills Thanks guys. Will give the ```OUTPUT``` clause a go. – janonimus Aug 04 '20 at 03:29
  • `OUPUT` is probably the cleanest solution however FYI `UPDLOCK` lasts until the end of the current transaction, so if you wrap your `SELECT` and `UPDATE` inside a transaction, the `UPDLOCK` will be held for both. – Dale K Aug 04 '20 at 03:51
  • 1
    @DaleK You could yes, but that will likely be slower than the `OUTPUT` solution. – mjwills Aug 04 '20 at 04:23
  • @mjwills agreed - I just wanted to clarify the use of the lock hint. – Dale K Aug 04 '20 at 04:25
  • Do I still need any table hints in the ```UPDATE``` with ```OUTPUT``` clause? – janonimus Aug 04 '20 at 05:54
  • @janonimus No need. – mjwills Aug 04 '20 at 06:45
  • The ```UPDATE``` with ```OUTPUT``` clause worked. But it caused deadlocks with other parts of my code. So I reviewed the queries and table. I specified deadlock priorities, added an index on the ```myqueue.status``` field, and added deadlock handling like https://stackoverflow.com/a/335669/6080047. Looks good so far. – janonimus Aug 06 '20 at 03:48

0 Answers0