7

I am moving a calculation engine from an Azure worker role into Azure Service Fabric. It works by listening for messages on a Service Bus that it then processes based on the message content.

Currently, the calculation is working correctly but if the compute is taking longer than a minute or so the message does not get removed from the queue after completion. In the worker role, we solved this by increasing the "AutoRenewTimeout".

var options = new OnMessageOptions { AutoComplete = true, AutoRenewTimeout = TimeSpan.FromMinutes(3) };
        _queueClient.OnMessage(OnMessage, options);

However, using the "ServiceFabric.ServiceBus" nuget package, I cannot work out where you would set this. I have used the demo project as a reference to set up a stateless service that actually runs the compute. Below is an extract from CalculateService.cs where the Stateless Service is initialised.

internal sealed class CalculateService : StatelessService
{
    public CalculateService(StatelessServiceContext context)
        : base(context)
    { }

    /// <summary>
    /// Optional override to create listeners (e.g., TCP, HTTP) for this service replica to handle client or user requests.
    /// </summary>
    /// <returns>A collection of listeners.</returns>
    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        string serviceBusQueueName = CloudConfigurationManager.GetSetting("QueueName");
        yield return new ServiceInstanceListener(context => new ServiceBusQueueCommunicationListener(new Handler(this), context, serviceBusQueueName), "StatelessService-ServiceBusQueueListener");
    }
}


internal sealed class Handler : AutoCompleteServiceBusMessageReceiver
{
    protected override Task ReceiveMessageImplAsync(BrokeredMessage message, CancellationToken cancellationToken)
    {
        ServiceEventSource.Current.ServiceMessage(_service, $"Handling queue message {message.MessageId}");
        var computeRole = new ExcelCompute();
        var rMessage = new RangeMessage();
        rMessage = message.GetBody<RangeMessage>();
        var result = computeRole.OnMessage(rMessage, message.MessageId); //returns true if the compute was successful (which it currently, always is)
        return Task.FromResult(result);
    }
 }

I did try using the <BrokeredMessage> message.Complete(); but that was throwing a message lock error.

Daniel Nugent
  • 43,104
  • 15
  • 109
  • 137
Alex Skinner
  • 125
  • 1
  • 8

1 Answers1

6
  1. Get the latest version of the package (>= v3.5.0). Set the property 'MessageLockRenewTimeSpan' on your CommunicationListener to a value that is smaller than your lock duration. (e.g. 50s where the lock duration is 60s) This allows for some clock skew. Note: by default this property is null, which means no automatic lock renewal is done.

This option works well when processing a batch that takes more time than the lock duration allows.

or

  1. You can use BrokeredMessage.RenewLock to periodically extend your message lock, when processing takes more time than the lock duration. You could do that inside a separate thread if needed. May be useful info

This option works well when processing a single message (batch size 1) takes more time than the lock duration allows.

Community
  • 1
  • 1
LoekD
  • 11,402
  • 17
  • 27
  • Thanks LoekD, I'm subscribing to an end calculate event in a separate thread with that uses message.RenewLock until the event is fired. However, the AutoRenewTimeout support would be greatly appreciated!! :) Thanks for your help! – Alex Skinner Sep 08 '16 at 13:56
  • I've added the support, check out the demo for the details. Please let me know how it works out for you. – LoekD Sep 08 '16 at 17:52
  • apologies for not coming back to you earlier. This did indeed resolve my issue perfectly. Thank you so much for your help and support. It is much appreciated! :) – Alex Skinner Feb 10 '17 at 11:02