3

I built a service to support multiple queue subscriptions in Azure Service Bus, but I'm getting some odd behavior.

My subscription singleton class has a method that looks like this:

    public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
    {
        try
        {
            var messageLifespan = TimeSpan.FromSeconds(ttl);
            var messageType = typeof(TMessage);
            if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
            {
                subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
                if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
                if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
                    subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
                _activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
            }

            var messageHandlerOptions = new MessageHandlerOptions(OnException)
            {
                MaxConcurrentCalls = maxDop,
                AutoComplete = false,
                MaxAutoRenewDuration = messageLifespan,
            };


            subscriptionClient.RegisterMessageHandler(
                async (azureMessage, cancellationToken) =>
                {
                    try
                    {
                        var textPayload = _encoding.GetString(azureMessage.Body);
                        var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
                        if (message == null)
                            throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
                        await execution.Invoke(message);
                        await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
                        await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
                    }
                }
                , messageHandlerOptions);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Subscribe(Action<TMessage>)");
            throw;
        }
    }

The idea is, you subscribe to Azure Service Bus for a specific type of message, and that directly corresponds to a queue. In your subscription, you pass in a delegate for how to process the message.

This seems to work... with one caveat.

Regardless of what I set the ttl for the MaxAutoRenewDuration, or the OperationTimeout, on a long-running process for any given message, after a minute the message is unlocked from the queue and another subscriber picks it up and starts processing it.

My understanding is that is exactly what the MaxAutoRenewDuration is supposed to prevent... but it doesn't seem to prevent anything.

Can anyone tell me what I need to do differently to make sure the consumer owns the message through to completion?

Jeremy Holovacs
  • 22,480
  • 33
  • 117
  • 254
  • What's your subscription description looks like (`LockDuration` specifically)? Did you try it with another entity? Premium or standard namespace? And the version of the client you're using. – Sean Feldman Jan 29 '19 at 23:23
  • With `LockDuration` set to something like 60 seconds, `MaxAutoRenewDuration` set to 2 minutes, I was able to process messages in 60+ seconds and couldn't reproduce the issue you're experiencing. If you'd like to share your project on GitHub, could have a look. Suspect it's the entity that is causing the issue. – Sean Feldman Jan 29 '19 at 23:35
  • @SeanFeldman, I can't find any property for `LockDuration`... are you referring to the Azure queue itself? That's set by default to 60 seconds. – Jeremy Holovacs Jan 30 '19 at 12:09
  • I'm not clear on what you mean by the "entity" causing the issue? There's no entities in this, just a simple POCO message class. – Jeremy Holovacs Jan 30 '19 at 12:12
  • Acute Service Bus entity is a queue/topic/subscription. So I was not able to reproduce this with `LockDuration` identical to what you have. Suspect it's the subscription issue. Have you tried to delete and recreate it? – Sean Feldman Jan 30 '19 at 12:16
  • @SeanFeldman I have not, but I will. I will let you know if that's effective at all. – Jeremy Holovacs Jan 30 '19 at 12:18
  • Hey @JeremyHolovacs Are you sure that this is role of `MaxAutoRenewDuration`, it is suppose to renew the lock duration so other processes won't start processing it? from doc: The maximum duration during which locks are automatically renewed - what does it mean? – Shahar Shokrani Feb 24 '20 at 13:24
  • Not clear... all I know is that it's flaky, to the point of being unreliable. We're actually moving our longrunning processes to Azure Storage Queues now to avoid this issue – Jeremy Holovacs Feb 24 '20 at 23:07

2 Answers2

0

there are a few options I can think of that you might want to look at.

  1. Instead of using the default ReceiveMode = PeekLock in the SubscriptionClient, set it to ReceiveAndDelete so once a message is consumed, it will be removed from the queue and won't be consumed by any other clients, this does mean that you have to handle exception gracefully and perform retry yourself;

  2. Have a look at OperationTimeout which according to the doco it is for Duration after which individual operations will timeout

Kyle Huang
  • 441
  • 3
  • 9
  • I would NOT recommend using `ReceiveAndDelete` in this scenario. That's not a good recommendation ad the auto-renewal feature works and for reliable processing `PeekLock` should be used. – Sean Feldman Jan 29 '19 at 23:36
  • @SeanFeldman I agree. Those were just some potential solutions I quickly thought about so just put them out there. There are scenarios where they fit. – Kyle Huang Jan 30 '19 at 00:40
  • Yeah we would not be able to use `ReceiveAndDelete`. I agree this would circumvent the problem, but only by giving me another problem. – Jeremy Holovacs Jan 30 '19 at 12:10
0

It turns out the remote process that the consumer was running was failing silently and not returning a failure status code (or anything else); the auto-refresh mechanism hung waiting for the result, so the message ended up timing out.

I'm not clear on how to prevent that, but once I fixed the issue on the remote process, the problem was no longer reproducible.

Moral of the story: If everything looks right, and it's still timing out, it seems the autorefresh mechanism shares some resources with asynchronous operations you are waiting on. It may be another place to look for failures.

Jeremy Holovacs
  • 22,480
  • 33
  • 117
  • 254