I have the following table which contains units of work:
create table [dbo].[Queue]
(
[QueueId] bigint not null identity(1,1),
[UserId] bigint default null, -- irrelevant foreign key here
primary key clustered ([QueueId] asc)
);
go
A pack of workers keeps circling this table and grabs one or more units of work by changing the UserId
field from null
to a positive value. No two workers should be able to update the same QueueId
at the same time and they should not wait (readpast should help with that).
The following method makes it easy to add work to the table:
/**
* Push some work units.
* (rewritten from basic while insert to @Larnu's Tally suggestion)
*/
create procedure [dbo].[spPushWork]
@Count int
as
begin
if @Count < 1 or @Count > 1000000 throw 50001, N'@Count must be 1-1M.', 1;
with [num] as
(
select [num] from (values (null),(null),(null),(null),(null),(null),(null),(null),(null),(null)) [num]([num])
), [tally] as
(
select top (@Count)
row_number() over (order by (select null)) as [ind]
from [num] [num1]
cross join [num] [num2]
cross join [num] [num3]
cross join [num] [num4]
cross join [num] [num5]
cross join [num] [num6]
)
merge into [dbo].[queue]
using (select [ind] from [tally]) [t]
on 1 = 0
when not matched then insert default values;
end
go
And now we have 2 methods to grab work.
Method #1 is thread safe (I hope) as it's a select-update combo:
/**
* This grabs work units in a single operation (Select + Update).
*/
create procedure [dbo].[spGrabSafe]
@UserId bigint
,@Count int = 1
as
begin
if @UserId < 1 throw 50001, N'@UserId must be 1+.', 1;
if @Count < 1 throw 50001, N'@Count must be 1+.', 2;
declare @Ids table ([Id] bigint not null);
-- fetch and claim via single query
with [cte] as
(
select top(@Count) [QueueId]
from [dbo].[Queue] with (readpast) -- skip locked
where [UserId] is null
order by [QueueId] asc
)
update [q]
set [UserId] = @UserId
output [inserted].[QueueId] into @Ids
from [dbo].[Queue] [q]
join [cte] on [cte].[QueueId] = [q].[QueueId];
select [Id] from @Ids;
end;
go
Method #2 does the same thing as method one in 2 operations by first locking the rows and then claiming them by changing the UserId
. It also has a delay argument that allows us to make it run longer for testing:
/**
* This grabs work units in multiple operations (Select&lock + Update).
*/
create procedure [dbo].[spGrabUnsafe]
@UserId bigint
,@Count int = 1
,@Delay time = null
as
begin
if @UserId < 1 throw 50001, N'@UserId must be 1+.', 1;
if @Count < 1 throw 50001, N'@Count must be 1+.', 1;
declare @Ids table ([Id] bigint not null);
begin transaction
-- fetch the QueueId's
insert into @Ids
select top(@Count) [QueueId]
from [dbo].[Queue]
with (xlock, rowlock, readpast) -- lock rows + skip locked
where [UserId] is null
order by [QueueId] asc;
-- claim via UserId
update [q]
set [UserId] = @UserId
from [dbo].[Queue] [q]
join @Ids [ids] on [ids].[Id] = [q].[QueueId];
-- this allows to wait a bit to test concurrency
if @Delay is not null
begin
declare @Time varchar(12) = convert(varchar, @Delay, 114);
waitfor delay @Time;
end;
commit transaction;
select [Id] from @Ids;
end
go
Is method #2 safe in a concurrent environment? There is a gap between selecting and updating UserId
. But the rows should be locked...