3

I have been tasked to improve the performance of a worker role which processes the messages from the queue.

It uses queueclient.OnMessage model with MaxConcurrentCalls setting as 1.

During the processing there is thread.sleep for 5 minutes and that will hamper the overall performance i.e. if I have 10 messages in the queue it will take at least 45 minutes before it processes the 10th message

I thought if I change MaxConcurrentCalls to let's say 5 then it should process 5 messages in parallel and reduce the waiting time by 25 minutes but that's not working :(

Also I tried to use OnMessageAsync with MaxConcurrentCalls but no luck. below is the snippet I tried,

.OnMessageAsync( async (brokeredMessage) =>
{                
    bool shouldAbandon = false;
    try
    {
        logger.Debug("Rcvd:" + brokeredMessage.SequenceNumber);
        SomeTask(brokeredMessage);
        await brokeredMessage.CompleteAsync();
    }
    catch (Exception ex)
    {
        logger.Error(String.Format("An Error occured {0}", ex.ToString()));
        shouldAbandon = true;
    }
    if (shouldAbandon)
    {
        await brokeredMessage.AbandonAsync();
    }   
}, new OnMessageOptions { AutoComplete = false, MaxConcurrentCalls = 10 });

private void SomeTask(BrokeredMessage bm)
{            
    logger.Debug("id: " + bm.MessageId + "on thread: " + Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(30 * 1000);
    logger.Debug("seq: " + bm.SequenceNumber);
    logger.Debug("body: " + bm.GetBody<string>());
}

Last option I can think is to start a new Task on OnMessage event. There are couple of things I'll need to take care of (making sure completed tasks are removed from the collection/main thread and pass a object from BrokeredMessage because BrokeredMessage is disposed so I can't use on the Task thread) and I have sanity tested it as well but I'm not convinced that this is the best solution.

Gaurav Mantri
  • 128,066
  • 12
  • 206
  • 241
DShah
  • 33
  • 3
  • Forgot to mention biggest downside of the task approach if something fails in processing we'll loose the message while by default message would have been on the queue may be. – DShah Sep 16 '16 at 11:56
  • what is the purpose of the sleep ? Simulate a long process ? Did you try to make your SomeTask function async ? – Thomas Sep 17 '16 at 03:29
  • Yes @Thomas, sleep simulates the long running process. I tried to return Task from SomeTask method to make it async but that didn't work because code was still using Sleep but then I come across another thread on C# multi threading (http://stackoverflow.com/questions/14177891/can-somebody-please-explain-async-await) and I realised I need to use Task.Delay and that worked as I expected – DShah Sep 21 '16 at 08:57

1 Answers1

1

Replace SomeTask implementation with the following:

async Task SomeTask(BrokeredMessage bm)
{
     logger.Debug("id: " + bm.MessageId + "on thread: " + Thread.CurrentThread.ManagedThreadId);
     await Task.Delay(30 * 1000).ConfigureAwait(false);
     logger.Debug("seq: " + bm.SequenceNumber);
     logger.Debug("body: " + bm.GetBody<string>());
}

When Thread.Sleep is mixed with async on the same thread (in your case), it causes thread to 'stall' all the tasks. Since there's no UI work, prefer to use .ConfigureAwait(false) on async operation to allow scheduler to manage what thread to execute on.

Sean Feldman
  • 23,443
  • 7
  • 55
  • 80
  • Thanks @Sean and sorry for the late reply as I was away but that's exactly I did before I went on holiday and it worked. Key was to use Task.Delay instead of Thread.Sleep which I realised from http://stackoverflow.com/questions/14177891/can-somebody-please-explain-async-await. – DShah Sep 21 '16 at 08:59