3

I am writing a console application in .net Core 2.1, my intent is to listen to messages on a Topic within ServiceBus and process new messages that arrive into Elasticsearch using the NEST api (NEST is probably irrelevant for my question but wanted to be transparent).

My Topic entity within ServiceBus is called "test" and I have a subscription also called "test" (full path would be "test/subscriptions/test").

In my .net Core console app, I've the following NuGet references:

<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.2.1" />
<PackageReference Include="NEST" Version="6.4.1" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />

I am having a very strange issue when using the .net Standard ServiceBus Api where I'm regularly getting a renew lock error:

Message handler encountered an exception Microsoft.Azure.ServiceBus.MessageLockLostException

I've stripped my code back into a very reproduceable sample here:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Microsoft.Azure.ServiceBus;
using Nest;
using Newtonsoft.Json;

namespace SampleApp
{
    public class Program
    {

    private static SubscriptionClient _subscriptionClient;
    private static IElasticClient _elasticClient;

    private static string ServiceBusConnectionString = "[connectionString]";
    private static string TopicName = "test";
    private static string SubscriptionName = "test";

    public static void Main(string[] args)
    {
        var elasticsearchSettings = new ConnectionSettings(new SingleNodeConnectionPool(new Uri("http://does.not.exist:9200"))).DefaultIndex("DoesNotExistIndex");
        _elasticClient = new ElasticClient(elasticsearchSettings);

        _subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);

        // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
            // Set it according to how many messages the application wants to process in parallel.
            MaxConcurrentCalls = 1,
            MaxAutoRenewDuration = TimeSpan.FromSeconds(400),
            // Indicates whether the message pump should automatically complete the messages after returning from user callback.
            // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
            AutoComplete = false
        };

        // Register the function that processes messages.
        _subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);

        Console.WriteLine("INFO: Process message handler registered, listening for messages");
        Console.Read();
    }

    private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        // Message received.
        var content = Encoding.UTF8.GetString(message.Body);

        var messageBody = JsonConvert.DeserializeObject<string[]>(content);

        Console.WriteLine($"INFO: Message arrived: {message}");
        Console.WriteLine($"INFO: Message body: \"{string.Join(",", messageBody)}\"");
        try
        {
            var response = _elasticClient.Ping();

            if (!response.IsValid && response.OriginalException != null)
                Console.WriteLine($"ERROR: ElasticSearch could not be reached, error was \"{response.OriginalException.Message}\"");
            else
                Console.WriteLine("INFO: ElasticSearch was contacted successfully");
        }
        catch (Exception e)
        {
            Console.WriteLine("!ERROR!: " + e);
        }

        await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
        Console.WriteLine("INFO: Message completed");
    }

    // Use this handler to examine the exceptions received on the message pump.
    private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}: " +
                          $"{exceptionReceivedEventArgs.ExceptionReceivedContext.Action}: " +
                          $"{exceptionReceivedEventArgs.ExceptionReceivedContext.EntityPath}");
        return Task.CompletedTask;
    }

}

This code is almost identical to the example taken from here: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions

I'm deliberately "pinging" an Elasticsearch instance that does not exist to produce the socket exception that helps me reproduce the issue.

One thing I've noticed is that when I create a new Topic and have EnabledPartioning = false, the problem does not occur.

Has anyone seen this before? Seems to be a problem deep within the ServiceBus code itself.

Note: I tried using the Receiver to read messages using "ReceiveAsync" and I also get this error in this scenario. Also, my driver for testing is to move off the .net Framework ServiceBus client (which does work with partitioning) and onto .net Core version.

Thanks in advance for any pointers!!

Rob
  • 6,819
  • 17
  • 71
  • 131
  • 1
    Possible duplicate of https://stackoverflow.com/questions/53748086/azure-service-bus-keeps-throwing-messagelocklostexceptions – Arunprabhu Dec 19 '18 at 14:32
  • Only slight difference on my question is that I don't thread.sleep and wait for the lock to expire. However, this does seem similar. – Rob Dec 19 '18 at 14:35
  • 1
    It's exactly the same issue. And it's the broker "issue", not the client. – Sean Feldman Dec 19 '18 at 14:58
  • Fair point - I was accidently implying the Client API was the issue - reworded accordingly. Hopefully Microsoft pickup the issue soon. – Rob Dec 19 '18 at 15:19
  • 1
    I also should add - auto lock renewal is **not** a guaranteed operation. – Sean Feldman Dec 20 '18 at 06:46
  • Ok cool - that's worth noting, thanks Sean! – Rob Dec 20 '18 at 11:35

2 Answers2

3

In my case above, the problem was down to a slight misunderstanding of my configuration. Within Azure, if you navigate to:

Resource Group > ServiceBusInstance > Topics > testTopic > testSubscription

You can find the subscription properties. Here you will see the duration of the lock when a message is sent. This defaults to 60 seconds but I extended my long running process to the max 5 minutes, as shown below:

enter image description here

Then in the code, when wiring up the properties for my Subscription Client, I needed to ensure the MaxAutoRenewDuration property was set correctly.

I had presumed this property meant that if you defined 30 seconds for this, that internally, the subscription client would renew the lock every 30 seconds, thus if your max expiry was 5 minutes for example, the lock would be renewed as long as you were processing the message...

In fact, what the property actually means is the maximum about of time they lock renewal will happen for internally on the subscription client.

So if you set this to 24 hours e.g. Timespan.FromHours(24) and your processing was to take 12 hours, it would be renewed. However, if you set this to 12 hours using Timespan.FromHours(12) and your code ran for 24, when you went to complete the message it would give a lockLost exception (as I was getting above over shorter intervals!).

One thing I've done, which was easy to implement, was dynamically pulling the LockDuration from the Subscription properties at runtime (all of my topics could have different configuration) and apply the MaxAutoRenewDuration appropriately using this.

Code sample:

sbNamespace.Topics.GetByName(“test”).Subscriptions.GetByName(“test”).LockDurationInSeconds

Note - I'm using the Azure.Management.Fluent package to build the sbNamespace.

Hopefully that helps others!

Rob
  • 6,819
  • 17
  • 71
  • 131
0

I would suggest you set a higher lock duration at the subscription MaxAutoRenewDuration = TimeSpan.FromSeconds(xxxx), or you can just use message.RenewLock().

Hope it helps!

Itay Podhajcer
  • 2,616
  • 2
  • 9
  • 14
  • I've set it here to 400 seconds (over 6 minutes) in the code above, I'm getting the error after the ping times out (roughly 4 seconds) on the 3rd message failure. Deliberately not prefetching any messages so they aren't starting their lock earlier than I am ready to process them. – Rob Dec 19 '18 at 14:56
  • 1
    `message.RenewLock()` doesn't exist in .net core version, does anyone have a workaround for this? – Ivan G. May 31 '19 at 14:22