Given the use of async
/ await
, your current code isn't necessarily synchronous
(in thread terms - the continuations can be invoked on different threads), although the dependency between getting a message and processing it obviously must be upheld.
Re: the thread may be in a waiting state that may allow it to be used elsewhere
Awaiting on well coded I/O-bound work doesn't need to consume a thread at all - see Stephen Cleary's There is no thread. Assuming the two awaited tasks are IO-bound, your code will likely consume no threads at all while it is awaiting IO bound work, i.e. the rest of your application will have the use of the Threadpool. So if your only concern was wasting threads, then nothing more is needed.
If however your concern is about performance and additional throughput, if there is downstream capacity to do concurrent calls to ProcessMessage
(e.g. multiple downstream web servers or additional database capacity), then you could look at parallelizing the IO bound work (again, without requiring more Threadpool threads)
For instance, if you are able to re-write the GetMessages
call to retrieve a batch at a time, you could try this:
var messages = await GetMessages(10);
var processTasks = messages
.Select(message => ProcessMessage(message));
await Task.WhenAll(processTasks);
(and if you can't touch code, you could just loop GetMessages
to retrieve 10 individual messages before the Task.WhenAll
)
However, if you do NOT have any further capacity to do concurrent ProcessMessage
calls, then you should instead look at addressing the bottleneck - e.g. adding more servers, optimizing code, or parallelizing the work done in ProcessMessage
work, etc.
The rationale is that, as you say, GetMessages
retrieves data off a queue. If you have no capacity to process the messages you've retrieved, all you could do is queue messages somewhere else, which seems rather pointless - rather leave the messages on the Queue until you are ready to process them. The queue depth will also create visibility of the backlog of work building up, which you can monitor.
Edit, Re : Occasionally one ProcessMessage()
call takes much longer than others
As per the comments, OP has additional information that an occasional ProcessMessage
call takes much longer than others, and would like to continue processing other messages in the interim.
One approach could be to apply a timeout to the Parallel tasks using this clever pattern here, which, if reached, will leave any long running ProcessTasks running, and will continue with the next batch of messages.
The below is potentially dangerous, in that it will require careful balancing of the timeout (1000ms below) against the observed frequency of the misbehaving ProcessMessage
calls - if the timeout is too low vs the frequency of 'slow' ProcessMessages, the downstream resources can become overwhelmed.
A safer (yet more complicated) addition would be to track the concurrent number of incomplete ProcessMessage
tasks via Task.IsCompleted, and if this hits a threshold, then to await completion of enough of these tasks to bring the backlog to a safe level.
while(!cancellationToken.IsCancelled)
{
// Ideally, the async operations should all accept cancellationTokens too
var message = await GetMessages(10, cancellationToken);
var processTasks = messages
.Select(message => ProcessMessage(message, cancellationToken));
await Task.WhenAny(Task.WhenAll(processTasks),
Task.Delay(1000, cancellationToken));
}
Re : Throttling for safe levels of downstream load - TPL DataFlow more than likely would be of use here.