Ok. I completely rewrote my question.
Database table
CREATE TABLE [dbo].[IntegrationCommandLog](
[Id] [uniqueidentifier] NOT NULL,
[Content] [nvarchar](max) NULL,
[OrderingKey] [bigint] IDENTITY(1,1) NOT NULL,
[RowVersion] [timestamp] NULL,
[Topic] [nvarchar](13) NULL,
CONSTRAINT [PK_IntegrationCommandLog] PRIMARY KEY NONCLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO
CREATE CLUSTERED INDEX [IX_IntegrationCommandLog_OrderingKey] ON [dbo].[IntegrationCommandLog]
(
[OrderingKey] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
GO
Reproducer. Run release without attached debugger is required
Required packages:
Install-Package Dapper
Install-Package System.Data.SqlClient
Code:
using Dapper;
using System;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
namespace TestApp
{
class Program
{
private const string Sql = "SELECT * FROM dbo.IntegrationCommandLog WHERE OrderingKey > @OrderingKey ORDER BY OrderingKey";
private const string cs = "Data Source=.;Initial Catalog=Test;Integrated Security=True";
static void Main(string[] args)
{
Task.Run(() => Query());
var tasks = new Task[200];
for (int i = 0; i < tasks.Length; ++i)
tasks[i] = Task.Run(() => Insert());
while (true)
{
int j = Task.WaitAny(tasks);
tasks[j] = Task.Run(() => Insert());
}
}
private async static Task Query()
{
long last = -1;
var connection = new SqlConnection(cs);
await connection.OpenAsync();
while (true)
{
var entries = await connection.QueryAsync<IntegrationLogEntry>(Sql, new { OrderingKey = last });
Console.WriteLine(entries.Count());
if (entries.Any())
{
last = entries.Aggregate((e1, e2) =>
{
if (e1.OrderingKey + 1 != e2.OrderingKey)
Console.WriteLine($"Sequence violation {e1.OrderingKey} {e2.OrderingKey}");
return e2;
}).OrderingKey;
}
await Task.Delay(1000);
}
}
private static async Task Insert()
{
string sql = @"SET NOCOUNT ON;
INSERT INTO [dbo].[IntegrationCommandLog] ([Id], [Content], [Topic])
VALUES ( @Id, @Content, @Topic);
SELECT [OrderingKey], [RowVersion]
FROM [dbo].[IntegrationCommandLog]
WHERE @@ROWCOUNT = 1 AND [Id] = @Id";
var content = new string('a', 1000);
using (var connection = new SqlConnection(cs))
{
await connection.OpenAsync();
await connection.ExecuteAsync(sql, new { Id = Guid.NewGuid(), Content = content, Topic = "SomeTopic" });
}
}
}
public class IntegrationLogEntry
{
public Guid Id { get; private set; }
public string Content { get; private set; }
public string Topic { get; private set; }
public long OrderingKey { get; private set; }
public byte[] RowVersion { get; set; }
}
}
Make sure there are no gaps
SELECT top 100 * FROM (SELECT *, rowid = ROW_NUMBER() OVER (ORDER BY OrderingKey) FROM [dbo].[IntegrationCommandLog]) l1
JOIN (SELECT *, rowid = ROW_NUMBER() OVER (ORDER BY OrderingKey) FROM [dbo].[IntegrationCommandLog]) l2 on l1.rowid + 1 = l2.rowid
WHERE l1.OrderingKey + 1 != l2.OrderingKey
Output
Microsoft SQL Server 2016 (SP1) (KB3182545) - 13.0.4001.0 (X64) Oct 28 2016 18:17:30 Copyright (c) Microsoft Corporation Developer Edition (64-bit) on Windows Server 2016 Datacenter 6.3 (Build 14393: ) (Hypervisor)
Questions:
- Why the query does not return all entries?
- How to get correct list of entries?