0

I need to create a high-throughput ServiceBus queue client that can process 1000s of messages a second. The only way I've managed to achieve this is through a combination of ReceiveMode.ReceiveAndDelete and PrefetchCount.

var client = new QueueClient(connectionString, queueName, ReceiveMode.ReceiveAndDelete);
client.PrefetchCount = 1000;
client.RegisterMessageHandler(ProcessMessagesAsync, new MessageHandlerOptions());
await Task.Delay(10000);   // wait for 10s while some messages are processed
await client.CloseAsync();

Reliability is not critical, so I can afford to lose messages if my application crashes. However, I would like to avoid losing messages during graceful shutdowns, such as the CloseAsync call above. I particularly would want to avoid losing the hundreds of prefetched messages that are residing in the local cache. Is there a way of making the client wait until the prefetched messages have been processed, but stop receiving new ones?

Þórir Sachie
  • 209
  • 1
  • 2
  • 11
  • I'm not manually completing them. The `ReceiveAndDelete` mode means they're deleted from the queue as soon as they're prefetched. – Þórir Sachie Jan 24 '19 at 15:58
  • Use a task completion source that can be set within the handler and awaited after registration. It will replace the task delay in the current code shown – Nkosi Jan 24 '19 at 16:01
  • @Nkosi, can you please elaborate? How would I be able to tell, within the handler, that the prefetched messages have all been processed? There will be much more than 1,000 messages on the Azure queue. (The 10s delay is just an example; I would need to stop processing when the application is being shut down.) – Þórir Sachie Jan 24 '19 at 16:05
  • Take a quick look at this article http://www.michalbialecki.com/2018/02/28/receiving-messages-azure-service-bus-net-core/. Note however that they use inline delegate – Nkosi Jan 24 '19 at 16:06
  • They're using the default `PeekLock` mode and no prefetch in that example, which would make it very slow (low 10s of messages a second). – Þórir Sachie Jan 24 '19 at 16:09
  • @ÞórirSachie BTW, with `PeeLock` mode and `MaxConcurrentCalls`, but without `prefetch` and `batch` configurations I was able to receive up to ~200s of messages a second. Several QueueClients with separate queues and prefetch configuration, in addition, will improve performance. – Serhii Kyslyi Dec 08 '19 at 08:14

1 Answers1

2

I didn't manage to find a suitable workaround for this; it appears to be an inherent limitation in the design of QueueClient. However, I found that MessageReceiver offers the same functionality, plus batch completion of messages, which allows for reliable high-throughput consumption of messages (around 500 msg/s).

var messageReceiver = new MessageReceiver(
    connectionString, 
    entityPath, 
    ReceiveMode.PeekLock, 
    prefetchCount: 1000);

var processedLockTokens = new List<string>();

while (true)
{
    var messages = await messageReceiver.ReceiveAsync(maxMessageCount: 1000, operationTimeout: TimeSpan.FromSeconds(5));
    if (messages == null)
        continue;

    try
    {
        foreach (var message in messages)
        {
            // Process message.

            processedLockTokens.Add(message.SystemProperties.LockToken);

            if (isCancellationRequested)
                return;
        }
    }
    finally
    {
        if (processedLockTokens.Any())
        {
            await messageReceiver.CompleteAsync(processedLockTokens);

            processedLockTokens.Clear();
        }
    }
}
Þórir Sachie
  • 209
  • 1
  • 2
  • 11