0

I have a process that reads a message from an Azure Service Bus Queue and converts that message to a Video to be Encoded by Azure Media Services. I noticed that if the process is kicked off very quickly in a row, the same video was being encoded right after another. Here is my code that adds the Video to the Queue

public class VideoManager
{
    string _connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
    string _queueName = ConfigurationManager.AppSettings["ServiceBusQueueName"];
    QueueClient _client;

    public VideoManager()
    {
        var conStringBuilder = new ServiceBusConnectionStringBuilder(_connectionString)
        {
            OperationTimeout = TimeSpan.FromMinutes(120)
        };

        var messagingFactory = MessagingFactory.CreateFromConnectionString(conStringBuilder.ToString());
        _client = messagingFactory.CreateQueueClient(_queueName);
    }

    public void Approve(Video video)
    {
        // Set video to approved. 
        video.ApprovalStatus = ApprovalStatus.Approved;
        var message = new BrokeredMessage(new VideoMessage(video, VideoMessage.MessageTypes.Approve, string.Empty));
        message.MessageId = video.RowKey;
        _client.Send(message);
    }
}

And the process that reads from the Queue

 class Program
{
    static QueueClient client;

    static void Main(string[] args)
    {
        VideoManager videoManager = new VideoManager();

        var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];

        var conStringBuilder = new ServiceBusConnectionStringBuilder(connectionString)
        {
            OperationTimeout = TimeSpan.FromMinutes(120)
        };

        var messagingFactory = MessagingFactory.CreateFromConnectionString(conStringBuilder.ToString());

        client = messagingFactory.CreateQueueClient(ConfigurationManager.AppSettings["ServiceBusQueueName"]);

        Console.WriteLine("Starting: Broadcast Center Continuous Video Processing Job");

        OnMessageOptions options = new OnMessageOptions
        {
            MaxConcurrentCalls = 25,
            AutoComplete = false
        };

        client.OnMessageAsync(async message =>
        {
            bool shouldAbandon = false;

            try
            {
                await HandleMessage(message);
            }
            catch (Exception ex)
            {
                shouldAbandon = true;
                Console.WriteLine(ex.Message);
            }
            if (shouldAbandon)
            {
                await message.AbandonAsync();
            }
        }, options);
        while (true) { }
    }
    async static Task<int> HandleMessage(BrokeredMessage message)
    {

        VideoMessage videoMessage = message.GetBody<VideoMessage>();

        Console.WriteLine(String.Format("Message body: {0}", videoMessage.Video.Title));
        Console.WriteLine(String.Format("Message id: {0}", message.MessageId));

        VideoProcessingService vp = new VideoProcessingService(videoMessage.Video);
        Task task;
        switch (videoMessage.MessageType)
        {
            case VideoMessage.MessageTypes.CreateThumbnail:
                task = new Task(() => vp.ProcessThumbnail(videoMessage.TimeStamp));
                task.Start();

                while (!task.IsCompleted)
                {
                    await Task.Delay(15000);
                    message.RenewLock();
                }
                await task;
                Console.WriteLine(task.Status.ToString());

                Console.WriteLine("Processing Complete");
                Console.WriteLine("Awaiting Message");
                break;
            case VideoMessage.MessageTypes.Approve:

                task = new Task(() => vp.Approve());
                task.Start();

                while (!task.IsCompleted)
                {
                    await Task.Delay(15000);
                    message.RenewLock();
                }
                await task;
                Console.WriteLine(task.Status.ToString());

                Console.WriteLine("Processing Complete");
                Console.WriteLine("Awaiting Message");
                break;
            default:
                break;
        }
        return 0;
    }
}

What I see in the Console Window is the following if I kick off the process 3 times in a row

Message id: 76aca19a-0698-449b-bf58-a24876fc4314

Message id: 76aca19a-0698-449b-bf58-a24876fc4314

Message id: 76aca19a-0698-449b-bf58-a24876fc4314

I thought maybe I did not have the settings correct, but they are there I am really at a loss here, as I would expect this to be fairly out of the box behavior. Does duplicate detection only work if the message has been completed, so I can't use OnMessageAsync()?

Isaac Levin
  • 2,809
  • 9
  • 49
  • 88
  • What's the LockDuration defined on that queue? You could be seeing this because message is being re-processed due to elapsed lock duration on the previous attempt, that still hasn't been completed. – Sean Feldman Aug 18 '16 at 15:17
  • that may be it, I have included the entire code, which does RenewLock() – Isaac Levin Aug 18 '16 at 15:51
  • Either that, or take advantage of the native OnMessage API to do it for you. Example: http://weblogs.asp.net/sfeldman/azure-service-bus-autorenewtimeout – Sean Feldman Aug 18 '16 at 15:56

2 Answers2

2

The issue is not the completion (as it was in the code), but the fact that you have in essence multiple consumers (25 concurrent callbacks) and it seems like the LockDuration is elapsing faster than the processing takes. As a result of that, message re-appears and re-processed. As a result of that you see the same message ID logged more than once.

Possible solutions are (as I've outlined in a comment above):

  1. Let OnMessage API manage timeout extension for you (example)
  2. Manually renew the lock as you've done using BrokeredMessage.RenewLock
Community
  • 1
  • 1
Sean Feldman
  • 23,443
  • 7
  • 55
  • 80
  • It just isn't the message id being logged more than once, the processing is happening multiple times. If I go the route of using the OnMessage API, will that stop a scenario where the same message is put on the queue multiple times? For instance if the API that drops onto to that queue is called twice? – Isaac Levin Aug 19 '16 at 14:38
  • Message ID logged multiple time is indication of multiple processing. You have to balance between LockDuration, AutoRenewal time and your processing time. If you drop a message with the same ID multiple times, there's no magic, you'll get it processed multiple times. You could use native de-duplication, but I'm not a fan of that feature as it's very fragile. It's better to make your handler idempotent. Also, native dedup is based on Message ID only, and if you need to use something else, it won't work. – Sean Feldman Aug 19 '16 at 15:26
1

There is a line of code missing from your HandleMessage code.

async static Task<int> HandleMessage(BrokeredMessage message)
{
  VideoMessage videoMessage = message.GetBody<VideoMessage>();

  message.CompleteAsync(); // This line...

  Console.WriteLine(String.Format("Message id: {0}", message.MessageId));
  // Processes Message
}

So yes you have to mark the message with either, Complete, Defer etc..

Also see this Answer, also found this which may be useful in how duplicate detection works

Community
  • 1
  • 1
Geek
  • 415
  • 4
  • 16
  • I actually am doing that, I added it to the OP, it is done later down the flow. – Isaac Levin Aug 18 '16 at 11:55
  • Ok, I would refer to the links that are provided in the answer. Hopefully this will help you. – Geek Aug 18 '16 at 11:59
  • Thanks, I have taken a look at those, and they don't quite fit my simple issue. I wonder if because I am grabbing all the messages in an async way, they are getting processed before the first one gets completed. This is possible since the time to encode a video can take over a minute. – Isaac Levin Aug 18 '16 at 12:20
  • With how this works the message is removed from the queue. Just check what happens if you move the message.Complete() to the client.OnMessageAsync just before the await call. – Geek Aug 18 '16 at 13:10