1

I am using a SQL Server database table as a work queue.

I have one writer and multiple readers.

The writer produces new records for the queue table (INSERT).

Each reader looks for records in a certain status (SELECT) to consume, takes one batch of records, marks each record as owned (UPDATE), processes them, and then marks all processed records as complete or failed (UPDATE). The workflow for the records is simple: The status goes from 'A' for Added, to 'B' for Being Processed, to either 'C' for Complete or 'F' for Failed. And the owner for each record goes from unset while in status 'A' to a unique-identifier identifying the reader for the remaining steps. The unique identifier (a Decimal) plus the status (nchar(1)) plus an additional batch id (int) that identifies the writer's larger batch are the three key fields in the processing. Also, the table has a timestamp field that LINQ uses for its concurrency checks.

I need to prevent locking delays and update failures due to two readers choosing the same records to process. As part of the processing work, a web service call is made that can take an indeterminate amount of time to complete. We pay the web service vendor by the transaction, so I do not want to make the call, then find out another process handled the same record. Such errors will cost us thousands of dollars.

I read this article:

Processing Data Queues in SQL Server with READPAST and UPDLOCK

That looked promising, but I decided to use LINQ to SQL. I read this article:

Linq to SQL with UPDLOCK

The latter says that there is no way to tell LINQ to use UPDLOCK short of crudely wrapping a SQL procedure. This is because LINQ uses optimistic concurrency and those features assume pessimistic locking.

So how do I solve this problem using features that LINQ does support? My application is multi-threaded, but I could have multiple instances running against the same queue table, so I cannot just have a single dispatcher hand off the records to each consumer thread.

I am using .NET 4.0. My application is a plain old Windows console application. The solution should work with SQL Server 2005 and SQL Server 2008 R2. I do not want to have to use a messaging system.

UPDATE: Further research on SO. Found this article:

LINQ to SQL - Queue

The answer recommends having a dispatcher thread, which I ruled out.

UPDATE 2: Found another article on SO:

Atomically Mark and return a group of rows in database

This looks really promising. I would need to execute direct SQL, but once I have my records marked and get my IDs, the rest can happen in L2S.

Community
  • 1
  • 1
Paul Chernoch
  • 5,275
  • 3
  • 52
  • 73
  • Transactions are relevant here. As a side note, is there any reason you're not using Entity Framework since LINQ2SQL is pretty much dead? http://stackoverflow.com/questions/5804186/concurrency-in-ef4-how-to-conditionally-create-an-entity – Hardrada Nov 09 '12 at 13:06
  • @Hardrada My first reason for not using EF is ignorance, the second is the work deadline. What is the learning curve for EF like? – Paul Chernoch Nov 09 '12 at 13:31
  • The learning curve for EF is terrible :) In your case I would probably use a Stored Procedure and hook it up to LINQ to SQL instead of using transactions in code. – Stilgar Nov 09 '12 at 13:44
  • @Stilgar do I detect a some subtle sarcasm regarding the learning curve? lol. – Hardrada Nov 09 '12 at 14:23
  • Your requirements dictate that the locking (whatever the details) is done *in the database*. I'm not sure what your objection to using a stored procedure is. – AakashM Nov 09 '12 at 15:04

2 Answers2

0

Perhaps you could take a more pessimistic approach to your dequeueing process within your readers.

Have a reader receive the next queued item. Have the reader then attempt to dequeue that item. The reader can assign a unique value to the item in the dequeue call, such a guid.

The reader could then attempt to get the queued item that it had dequeued. When it receives the item, it can compare the item's guid that it had assigned. If they are the same, it was a dequeue that the reader can then process.

Assuming it's possible for your reader to dequeue one item at a time, this should be feasible. I'm not sure how that might work when dequeuing multiple items per reader.

Shan Plourde
  • 8,528
  • 2
  • 29
  • 42
  • Worth investigating. I am willing to dequeue one at a time and then batch them up for the web service call. I get the best Web performance when I send 50 records at a time. (100 records occasionally exceeds some URL length limit. The service only uses GET, not POST.) – Paul Chernoch Nov 09 '12 at 13:54
  • I'm not sure how failover might be addressed with that approach though. Perhaps timestamps could be used to support failover. – Shan Plourde Nov 09 '12 at 14:28
  • I have a DateTime field on my master batch table. When it times out, I release all records in the queue table that have been reserved for that batch. This allows me to restart a partially complete batch and resume processing. – Paul Chernoch Nov 09 '12 at 17:05
0

serveThis is my answer. I associated a Stored procedure with my DataContext to reserve addresses. It is able to choose, mark, and return the records in a single operation, using pessimistic concurrency.

CREATE PROCEDURE dbo.kccsp_ReserveAddresses
      @BATCH_SIZE int,
      @BATCH_ID int,
      @PROCESS_TOKEN numeric(18,0)
  AS
      -- Reserve a group of addresses in the queue so that a consumer may process them,
      -- but all other consumers will leave them alone.
      -- The query hints are essential to prevent lock contention, 
      -- or concurrency errors from multiple processors handling the same address.
      update TOP (@BATCH_SIZE) 
          KCC_GeoCodingAddressQueue WITH (ROWLOCK, READPAST, UPDLOCK)
      set 
          [Process Status] = 'B', 
          [Process Token] = @PROCESS_TOKEN
      output INSERTED.*
      where [Process Status] = 'A' and [Process Token] = 0

I did this by dragging the procedure onto the result table in the designer for my dbml. This causes the resultset to be the same as my entity class, and returns all my assigned records at the same time that it marks them. The SQL hints deal with concurrency and lock contention.

When I need to call it in C#, I do this:

   int? batchSize = 50;
   int? batchIdToReserve = Batch.GeoBatchID;
   decimal? processorId = someId;
   var addresses = Context.kccsp_ReserveAddresses(batchSize, batchIdToReserve, processorId);
Paul Chernoch
  • 5,275
  • 3
  • 52
  • 73