2

I'm executing a web request to get a message, then await the processing of that message, then repeat the whole process again.

The processing of the message will be long running and the thread may be in a waiting state that may allow it to be used elsewhere. What I'd like is to continue the while loop, get more messages and process them when threads become free.

Current synchronous code:

while(!cancellationToken.IsCancelled) {
  var message = await GetMessage();

  await ProcessMessage(message); // I'll need it to continue from here if thread is released.
}

The scenario this is used in is a message queue Consumer service.

Shawn Mclean
  • 56,733
  • 95
  • 279
  • 406

2 Answers2

2

Given the use of async / await, your current code isn't necessarily synchronous (in thread terms - the continuations can be invoked on different threads), although the dependency between getting a message and processing it obviously must be upheld.

Re: the thread may be in a waiting state that may allow it to be used elsewhere

Awaiting on well coded I/O-bound work doesn't need to consume a thread at all - see Stephen Cleary's There is no thread. Assuming the two awaited tasks are IO-bound, your code will likely consume no threads at all while it is awaiting IO bound work, i.e. the rest of your application will have the use of the Threadpool. So if your only concern was wasting threads, then nothing more is needed.

If however your concern is about performance and additional throughput, if there is downstream capacity to do concurrent calls to ProcessMessage (e.g. multiple downstream web servers or additional database capacity), then you could look at parallelizing the IO bound work (again, without requiring more Threadpool threads)

For instance, if you are able to re-write the GetMessages call to retrieve a batch at a time, you could try this:

var messages = await GetMessages(10);
var processTasks = messages
    .Select(message => ProcessMessage(message));
await Task.WhenAll(processTasks);

(and if you can't touch code, you could just loop GetMessages to retrieve 10 individual messages before the Task.WhenAll)

However, if you do NOT have any further capacity to do concurrent ProcessMessage calls, then you should instead look at addressing the bottleneck - e.g. adding more servers, optimizing code, or parallelizing the work done in ProcessMessage work, etc.

The rationale is that, as you say, GetMessages retrieves data off a queue. If you have no capacity to process the messages you've retrieved, all you could do is queue messages somewhere else, which seems rather pointless - rather leave the messages on the Queue until you are ready to process them. The queue depth will also create visibility of the backlog of work building up, which you can monitor.

Edit, Re : Occasionally one ProcessMessage() call takes much longer than others

As per the comments, OP has additional information that an occasional ProcessMessage call takes much longer than others, and would like to continue processing other messages in the interim.

One approach could be to apply a timeout to the Parallel tasks using this clever pattern here, which, if reached, will leave any long running ProcessTasks running, and will continue with the next batch of messages.

The below is potentially dangerous, in that it will require careful balancing of the timeout (1000ms below) against the observed frequency of the misbehaving ProcessMessage calls - if the timeout is too low vs the frequency of 'slow' ProcessMessages, the downstream resources can become overwhelmed.

A safer (yet more complicated) addition would be to track the concurrent number of incomplete ProcessMessage tasks via Task.IsCompleted, and if this hits a threshold, then to await completion of enough of these tasks to bring the backlog to a safe level.

while(!cancellationToken.IsCancelled) 
{
   // Ideally, the async operations should all accept cancellationTokens too
   var message = await GetMessages(10, cancellationToken);
   var processTasks = messages
      .Select(message => ProcessMessage(message, cancellationToken));
   await Task.WhenAny(Task.WhenAll(processTasks), 
                      Task.Delay(1000, cancellationToken));
 }

Re : Throttling for safe levels of downstream load - TPL DataFlow more than likely would be of use here.

StuartLC
  • 104,537
  • 17
  • 209
  • 285
  • Re : "Is my code IO Bound" - unless your awaited ProcessMessage is actually doing CPU-Bound work and then returning something silly like `Task.FromResult(true)`, and is actually waiting on an external database, REST service, network resource, etc, then it likely is IO bound. Re : What's an optimal number of messages per batch? You'll need to profile the load on the resources consumed by `ProcessMessage`, and take other concurrent applications using the same resources into account. – StuartLC Sep 26 '17 at 06:35
  • 2
    Thanks for the response, your example is exactly what I have right now! The issue that came about for writing this question with a simplified example is that one of that 10 message will be very long running (polling another service every minute), which ends up holding up the loop to get more messages to process. I did end up writing out a question on code review, which is off-topic but more insight can be gained from it: https://codereview.stackexchange.com/questions/176451/infinite-execution-of-list-of-tasks-continuation – Shawn Mclean Sep 26 '17 at 07:27
  • I guess you've also got the option of not awaiting the `ProcessMessage` at all, but if messages were received faster than you could process them, this would risk overwhelming the downstream resources used by `ProcessMessage`, and it is also [not something to be doing from WebAPI / MVC type apps](https://stackoverflow.com/questions/45626713/webapi2-return-from-controller-action-after-one-task-completes-but-continue-w/45627135#45627135). – StuartLC Sep 26 '17 at 08:26
  • 2
    If it is only the occasional message which is problematic, then perhaps look at throwing in a [`Task.Delay` task](https://stackoverflow.com/a/11191070/314291) wrapped with a `Task.WhenAny` in order to effect a timeout around the `Task.WhenAll`. This would leave incomplete tasks still 'running' but would free your loop up to continue processing. (i.e. don't worry about checking the 'completion state' of the tasks if it times out) – StuartLC Sep 26 '17 at 08:26
0

Take a look at https://msdn.microsoft.com/library/hh191443(vs.110).aspx should get you going. Also, seems like ProcessMessage aught to end with 'Async' according to the C#/.NET style guide.

You'll want to set up a Task<ReturnTypeOfProcessMessage> procMessageTask = ProcessMessageAsync(message);

then you can do your business while its running, SomeBusiness(...)

then

await procMessageTask;

Seems like you may also want some type of await-with-timeout functionality so that you can poll, here's a question related to that:

Asynchronously wait for Task<T> to complete with timeout

HTH

Ian Ray
  • 378
  • 2
  • 18