I am writing a console application in .net Core 2.1, my intent is to listen to messages on a Topic within ServiceBus and process new messages that arrive into Elasticsearch using the NEST api (NEST is probably irrelevant for my question but wanted to be transparent).
My Topic entity within ServiceBus is called "test" and I have a subscription also called "test" (full path would be "test/subscriptions/test").
In my .net Core console app, I've the following NuGet references:
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.2.1" />
<PackageReference Include="NEST" Version="6.4.1" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
I am having a very strange issue when using the .net Standard ServiceBus Api where I'm regularly getting a renew lock error:
Message handler encountered an exception Microsoft.Azure.ServiceBus.MessageLockLostException
I've stripped my code back into a very reproduceable sample here:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Microsoft.Azure.ServiceBus;
using Nest;
using Newtonsoft.Json;
namespace SampleApp
{
public class Program
{
private static SubscriptionClient _subscriptionClient;
private static IElasticClient _elasticClient;
private static string ServiceBusConnectionString = "[connectionString]";
private static string TopicName = "test";
private static string SubscriptionName = "test";
public static void Main(string[] args)
{
var elasticsearchSettings = new ConnectionSettings(new SingleNodeConnectionPool(new Uri("http://does.not.exist:9200"))).DefaultIndex("DoesNotExistIndex");
_elasticClient = new ElasticClient(elasticsearchSettings);
_subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
// Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
// Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,
MaxAutoRenewDuration = TimeSpan.FromSeconds(400),
// Indicates whether the message pump should automatically complete the messages after returning from user callback.
// False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
AutoComplete = false
};
// Register the function that processes messages.
_subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
Console.WriteLine("INFO: Process message handler registered, listening for messages");
Console.Read();
}
private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
// Message received.
var content = Encoding.UTF8.GetString(message.Body);
var messageBody = JsonConvert.DeserializeObject<string[]>(content);
Console.WriteLine($"INFO: Message arrived: {message}");
Console.WriteLine($"INFO: Message body: \"{string.Join(",", messageBody)}\"");
try
{
var response = _elasticClient.Ping();
if (!response.IsValid && response.OriginalException != null)
Console.WriteLine($"ERROR: ElasticSearch could not be reached, error was \"{response.OriginalException.Message}\"");
else
Console.WriteLine("INFO: ElasticSearch was contacted successfully");
}
catch (Exception e)
{
Console.WriteLine("!ERROR!: " + e);
}
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
Console.WriteLine("INFO: Message completed");
}
// Use this handler to examine the exceptions received on the message pump.
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}: " +
$"{exceptionReceivedEventArgs.ExceptionReceivedContext.Action}: " +
$"{exceptionReceivedEventArgs.ExceptionReceivedContext.EntityPath}");
return Task.CompletedTask;
}
}
This code is almost identical to the example taken from here: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions
I'm deliberately "pinging" an Elasticsearch instance that does not exist to produce the socket exception that helps me reproduce the issue.
One thing I've noticed is that when I create a new Topic and have EnabledPartioning = false, the problem does not occur.
Has anyone seen this before? Seems to be a problem deep within the ServiceBus code itself.
Note: I tried using the Receiver to read messages using "ReceiveAsync" and I also get this error in this scenario. Also, my driver for testing is to move off the .net Framework ServiceBus client (which does work with partitioning) and onto .net Core version.
Thanks in advance for any pointers!!