I have a dot net core 5 console app which is processing around 100,000+ messages per min from rabbitmq
When a message is received from rabbitmq, a thread goes off and crunches some numbers, however one of those operations is to call an external API to get information about its location. When this external API service slows down and response times go up, I see thread starvation and thread count on windows task manager can get into the 1000's and the app basically slows to doing nothing
When the app loads the main thread establishes a connection to rabbitmq and subscribes to new messages arriving in the rabbitmq, and every time a message arrives, my console app consumes each message, and starts a threadpool item and continues getting new rabbitmq messages
private void Consumer_Received(object sender, BasicDeliverEventArgs deliveryArgs)
{
var data = Encoding.UTF8.GetString(deliveryArgs.Body.ToArray());
ThreadPool.QueueUserWorkItem(new WaitCallback(StartProcessing), data);
}
If I put a breakpoint, this void keeps being hit and a new threadpool process calls the StartProcessing void which is where the cpu crunching happens and the external api call
public void StartProcessing(object xdata)
{
//1. crunch cpu
//2. call external API
}
Each message is processed in around 100ms for the cpu stuff, but the external API is taking between 80-500ms on a normal day, but when there are issues (possibly network) it can take upto 10 secs to respond to 1 request, this is when the app starts to break.
My question is surrounding this implementation and how stop thread starvation. This is a high throughput multithreaded app and it needs to process as many messages as possible. The app needs to relieve back pressure when the external API is slow to respond and its constantly context switching threads.
Is using ThreadPool.QueueUserWorkItem the correct implementation or should I be using Async await etc?
I'm also open to hearing if this is a bad implementation and if there is another pattern I should be using for this.
////////////////////////////////// UPDATE 1 //////////////////////////////////
So i changed the code to use async task and its super slow to get messages from rabbitmq The old code got all messages (200,000) within a few seconds, the new code got through about 1,000 in a few minutes
the new code is
private void Consumer_Received(object sender, BasicDeliverEventArgs deliveryArgs)
{
StartProcessing(deliveryArgs.Body.ToArray()).ConfigureAwait(false);
}
public static async Task<bool> StartProcessing(ReadOnlyMemory<byte> data)
{
await Task.Run(() =>
{
ReadOnlySpan<byte> xdata = data.Span; //defensiveCopy of in memory pointer
//do stuff
}).ConfigureAwait(false);
return true;
}
I is there something wrong with my implentation? The "StartProcessing" code should be fire and forgot really as the main thread should continue to the next message in rabbitmq I seems like its waiting for the message to process before continuing //////////////////////////////////