0

I've got an issue where I publish messages to an Azure Service Bus topic. I have a couple of batch consumers subscribed on those topics (no forwarding to a queue).

The problem is that there are random warnings about MessageLockLostExceptions, and it feels like there is something wrong, but I don't know what.

I've set a lock duration of 5 minutes. And the errors are thrown almost immediately (so I guess it can't be that).

These are examples of the errors thrown:

warn: MassTransit[0]
      Message Lock Lost: 5d5400005de20015b8d008d9a521105f
      Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
         at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
         at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
         at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
         at Microsoft.Azure.ServiceBus.Core.MessageReceiver.CompleteAsync(IEnumerable`1 lockTokens)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
warn: MassTransit[0]
      Message Lock Lost: 5d5400005de20015a5cd08d9a521105f
      Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
         at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
         at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
         at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
         at Microsoft.Azure.ServiceBus.Core.MessageReceiver.CompleteAsync(IEnumerable`1 lockTokens)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)

Here is a minimal reproduction of the issue. It will set everything up and publish 50k messages.

csproj:

<Project Sdk="Microsoft.NET.Sdk.Worker">

    <PropertyGroup>
        <TargetFramework>net6.0</TargetFramework>
        <Nullable>enable</Nullable>
        <ImplicitUsings>enable</ImplicitUsings>
        <UserSecretsId>dotnet-WorkerService-C6197FFA-DCA6-4867-8576-A51ADAE04FD3</UserSecretsId>
    </PropertyGroup>

    <ItemGroup>
        <PackageReference Include="MassTransit" Version="7.2.3" />
        <PackageReference Include="MassTransit.AspNetCore" Version="7.2.3" />
        <PackageReference Include="MassTransit.Azure.ServiceBus.Core" Version="7.2.3" />
        <PackageReference Include="MassTransit.EntityFrameworkCore" Version="7.2.3" />
        <PackageReference Include="MassTransit.Prometheus" Version="7.2.3" />
        <PackageReference Include="MassTransit.RabbitMQ" Version="7.2.3" />
        <PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
    </ItemGroup>
</Project>

The code:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Azure.ServiceBus.Core;
using MassTransit.Topology;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using WorkerService;
using IHost = Microsoft.Extensions.Hosting.IHost;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        const string connectionString = "<ASB ConnectionString here>";
        Configure(services, connectionString);
        services.AddHostedService<Worker>();
    })
    .Build();
await host.RunAsync();

void Configure(IServiceCollection services, string connectionString)
{
    services.AddMassTransit(busConfigurator =>
    {
        busConfigurator.AddConsumer<TestConsumer1>();
        busConfigurator.AddConsumer<TestConsumer2>();
        busConfigurator.AddConsumer<TestConsumer3>();
        busConfigurator.AddConsumer<TestConsumer4>();
        busConfigurator.AddConsumer<TestConsumer5>();

        busConfigurator.UsingAzureServiceBus((context, serviceBusBusFactoryConfigurator) =>
        {
            serviceBusBusFactoryConfigurator.Host(connectionString);

            ConfigureSubsriptionEndpoint<TestConsumer1>(serviceBusBusFactoryConfigurator, context, "subscriber-1");
            ConfigureSubsriptionEndpoint<TestConsumer2>(serviceBusBusFactoryConfigurator, context, "subscriber-2");
            ConfigureSubsriptionEndpoint<TestConsumer3>(serviceBusBusFactoryConfigurator, context, "subscriber-3");
            ConfigureSubsriptionEndpoint<TestConsumer4>(serviceBusBusFactoryConfigurator, context, "subscriber-4");
            ConfigureSubsriptionEndpoint<TestConsumer5>(serviceBusBusFactoryConfigurator, context, "subscriber-5");
        });
    });
    services.AddMassTransitHostedService(true);
}

void ConfigureSubsriptionEndpoint<TConsumer>(IServiceBusBusFactoryConfigurator serviceBusBusFactoryConfigurator, IBusRegistrationContext context, string subscriptionName)
    where TConsumer : class, IConsumer<Batch<IMyEvent>>
{
    serviceBusBusFactoryConfigurator.SubscriptionEndpoint<IMyEvent>(
        subscriptionName,
        receiveEndpointConfigurator =>
        {
            receiveEndpointConfigurator.LockDuration = TimeSpan.FromMinutes(5);
            receiveEndpointConfigurator.PublishFaults = false;
            receiveEndpointConfigurator.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
            receiveEndpointConfigurator.UseMessageRetry(r => r.Intervals(500, 2000));
            receiveEndpointConfigurator.PrefetchCount = 1100;

            receiveEndpointConfigurator.ConfigureConsumer<TConsumer>(
                context,
                consumerConfigurator =>
                {
                    consumerConfigurator.Options<BatchOptions>(batchOptions =>
                    {
                        batchOptions.MessageLimit = 100;
                        batchOptions.TimeLimit = TimeSpan.FromSeconds(5);
                        batchOptions.ConcurrencyLimit = 10;
                    });
                });
        });
}

namespace WorkerService
{
    public class TestConsumer1 : IConsumer<Batch<IMyEvent>>
    {
        private readonly Random _random;
        private readonly ILogger<TestConsumer1> _logger;

        public TestConsumer1(ILogger<TestConsumer1> logger)
        {
            _logger = logger;
            _random = new Random();
        }

        public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
        {
            _logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer1), context.Message.Length);
            await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
        }
    }

    public class TestConsumer2 : IConsumer<Batch<IMyEvent>>
    {
        private readonly Random _random;
        private readonly ILogger<TestConsumer2> _logger;

        public TestConsumer2(ILogger<TestConsumer2> logger)
        {
            _logger = logger;
            _random = new Random();
        }

        public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
        {
            _logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer2), context.Message.Length);
            await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
        }
    }

    public class TestConsumer3 : IConsumer<Batch<IMyEvent>>
    {
        private readonly Random _random;
        private readonly ILogger<TestConsumer3> _logger;

        public TestConsumer3(ILogger<TestConsumer3> logger)
        {
            _logger = logger;
            _random = new Random();
        }

        public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
        {
            _logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer3), context.Message.Length);
            await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
        }
    }

    public class TestConsumer4 : IConsumer<Batch<IMyEvent>>
    {
        private readonly Random _random;
        private readonly ILogger<TestConsumer4> _logger;

        public TestConsumer4(ILogger<TestConsumer4> logger)
        {
            _logger = logger;
            _random = new Random();
        }

        public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
        {
            _logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer4), context.Message.Length);
            await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
        }
    }

    public class TestConsumer5 : IConsumer<Batch<IMyEvent>>
    {
        private readonly Random _random;
        private readonly ILogger<TestConsumer5> _logger;

        public TestConsumer5(ILogger<TestConsumer5> logger)
        {
            _logger = logger;
            _random = new Random();
        }

        public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
        {
            _logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer5), context.Message.Length);
            await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
        }
    }

    [EntityName("my-event")]
    public interface IMyEvent
    {
    }

    public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;
        private readonly IBus _bus;

        public Worker(
            ILogger<Worker> logger,
            IBus bus)
        {
            _logger = logger;
            _bus = bus;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);

            var tasks = new List<Task>();
            var count = 50000;
            for (int i = 0; i < count; i++)
            {
                tasks.Add(_bus.Publish<IMyEvent>(new { }));
            }

            await Task.WhenAll(tasks);
        }
    }
}

Update

I've been able to confirm that these errors correlate with throttling of the Azure Service Bus instance. The first image shows the occurrence of the errors, and the second image shows the amount of throttled requests in ASB. They seems to correlate pretty well. That would also explain why I wasn't reliably able to reproduce the errors.

enter image description here enter image description here

Joel
  • 8,502
  • 11
  • 66
  • 115
  • If this were a simple `git clone` and `dotnet run` example, it would be much easier to find time to look at it. – Chris Patterson Nov 11 '21 at 18:45
  • 1
    @ChrisPatterson You are right! I put it here (with a `README` for instructions): https://github.com/joelnotified/masstransit-batch-test. However... I can't get it to fail anymore. I don't think I have changed anything, so I have no clue what's going on. Will try see if I can reproduce it in a more stable way and I will let you know. – Joel Nov 11 '21 at 20:07
  • @ChrisPatterson I've been trying to figure out why I can't reproduce it reliably, and it looks like it's related to when throttling of the ASB occurs. In your opinion: Are errors like these expected during throttling? – Joel Nov 12 '21 at 13:53
  • 1
    Ah, throttling in your namespace would definitely cause some issues. Since ASB is essentially cutting you off. Strange that they would expire locks, unless maybe you're exceeding a lock count threshold. Could be one of those "soft" limits under the hood they don't advertise. – Chris Patterson Nov 12 '21 at 15:31

1 Answers1

-1

Firstly, we need to make sure that we are at latest version, if not, please upgrade it.

Make sure you add retry policy in function.json as below:

{
    "disabled": false,
    "bindings": [
        {
            ....
        }
    ],
    "retry": {
        "strategy": "fixedDelay",
        "maxRetryCount": 4,
        "delayInterval": "00:00:10"
    }
}

Along with it check for dead letter and configure it with help of MS Docs.

We need to mentioned the lock duration for renewal after it expires in host.json by including the parameter in JSON structure "maxAutoLockRenewalDuration": "00:05:00"

Below is the example for host.json

{
    "version": "2.0",
    "extensions": {
        "serviceBus": {
            "clientRetryOptions":{
                "mode": "exponential",
                "tryTimeout": "00:01:00",
                "delay": "00:00:00.80",
                "maxDelay": "00:01:00",
                "maxRetries": 3
            },
            "prefetchCount": 0,
            "autoCompleteMessages": true,
            "maxAutoLockRenewalDuration": "00:05:00",
            "maxConcurrentCalls": 16,
            "maxConcurrentSessions": 8,
            "maxMessages": 1000,
            "sessionIdleTimeout": "00:01:00"
        }
    }
}

From the above JSON, change the parameter of autoCompleteMessages from true to false as "autoCompleteMessages": false.

When set to false, you are responsible for calling MessageReceiver methods to complete, abandon, or deadletter the message. If an exception is thrown (and none of the MessageReceiver methods are called), then the lock remains. Once the lock expires, the message is re-queued with the DeliveryCount incremented and the lock is automatically renewed.

Using ReceiveandLock has solved this issue.

Refer to these SO thread: SO1 and SO2 (thanks to the answered authors for detailed explanations)

SaiKarri-MT
  • 1,174
  • 1
  • 3
  • 8