I've got a multithreaded Windows service which is consuming messages off a Rabbit queue and sending emails based on the content of the messages.
When the rabbit client is initialized at startup, it limits the Threadpool threads with a Min and Max value.
For each message taken off the queue the service is sending an HTTP request to a web api service using HttpClient
GetAsync
method to retrieve email address.
The problem is that the request goes off to the data service, but the response never comes back. The windows service keeps consuming messages off the queue and hangs after a while (probably runs of of free threads) - it's waiting for any of the calls to web api to complete which they never do.
I was able to resolve the problem using a Semaphore
class for the Rabbit loop rather than trying to limit the Threadpool directly, however, I'd like to know why the service got into this state in the first place. Is that to do with the GetAsync
call? Is it perhaps freeing up the thread for the duration of the request, so that the main loop can steal it for a next request?
Any ideas?
The original loop:
while (!_stopped)
{
if (_paused) continue;
try
{
using (var messageBusReceiver = _rabbitQueueClient.ConfigureMessageBusReceiver())
{
using (_consumer = messageBusReceiver.Listen<PublishableItem>())
{
while (!_stopped)
{
if (_paused) continue;
_consumer.Consume(callback, consumeSynchronously: false);
_communicationErrorCount = 0;
}
}
}
}
The Consume
method is eventually doing this:
_threadPoolProvider.QueueUserWorkItem(o =>
consumeMessage(callback, eventArgs, o), message);
The callback begins with the following lines - the null checking line is never reached:
var foo = _fooService.GetFoo(messageInfo.FooId);
if (foo == null)
{
throw new FooNotFoundException(
String.Format(CultureInfo.InvariantCulture, "Foo was not found for FooId of {0}", messageInfo.FooId));
}
The client method:
public Foo GetFoo(Guid id)
{
var path = getPathWithQueryStringAndDebug("getfoo", "id", id.ToString());
var response = _client.GetAsync(path).Result;
return processResponse<FooDto>(response);
}