-3

I have a dot net core 5 console app which is processing around 100,000+ messages per min from rabbitmq

When a message is received from rabbitmq, a thread goes off and crunches some numbers, however one of those operations is to call an external API to get information about its location. When this external API service slows down and response times go up, I see thread starvation and thread count on windows task manager can get into the 1000's and the app basically slows to doing nothing

When the app loads the main thread establishes a connection to rabbitmq and subscribes to new messages arriving in the rabbitmq, and every time a message arrives, my console app consumes each message, and starts a threadpool item and continues getting new rabbitmq messages

private void Consumer_Received(object sender, BasicDeliverEventArgs deliveryArgs)
{
    var data = Encoding.UTF8.GetString(deliveryArgs.Body.ToArray());
    ThreadPool.QueueUserWorkItem(new WaitCallback(StartProcessing), data);
}

If I put a breakpoint, this void keeps being hit and a new threadpool process calls the StartProcessing void which is where the cpu crunching happens and the external api call

public void StartProcessing(object xdata)
{
    //1. crunch cpu
    
    //2. call external API
    
}

Each message is processed in around 100ms for the cpu stuff, but the external API is taking between 80-500ms on a normal day, but when there are issues (possibly network) it can take upto 10 secs to respond to 1 request, this is when the app starts to break.

My question is surrounding this implementation and how stop thread starvation. This is a high throughput multithreaded app and it needs to process as many messages as possible. The app needs to relieve back pressure when the external API is slow to respond and its constantly context switching threads.

Is using ThreadPool.QueueUserWorkItem the correct implementation or should I be using Async await etc?

I'm also open to hearing if this is a bad implementation and if there is another pattern I should be using for this.

////////////////////////////////// UPDATE 1 //////////////////////////////////

So i changed the code to use async task and its super slow to get messages from rabbitmq The old code got all messages (200,000) within a few seconds, the new code got through about 1,000 in a few minutes

the new code is

private void Consumer_Received(object sender, BasicDeliverEventArgs deliveryArgs)
{
    StartProcessing(deliveryArgs.Body.ToArray()).ConfigureAwait(false);
}
    
public static async Task<bool> StartProcessing(ReadOnlyMemory<byte> data)
{
    await Task.Run(() =>
    {
        ReadOnlySpan<byte> xdata = data.Span; //defensiveCopy of in memory pointer

        //do stuff
        
    }).ConfigureAwait(false);

return true;
}

I is there something wrong with my implentation? The "StartProcessing" code should be fire and forgot really as the main thread should continue to the next message in rabbitmq I seems like its waiting for the message to process before continuing //////////////////////////////////

Quade
  • 75
  • 9
  • Does this answer your question? [c# Threadpool - limit number of threads](https://stackoverflow.com/questions/10342057/c-sharp-threadpool-limit-number-of-threads) – Sinatr Jun 11 '21 at 06:52
  • thanks @Sinatr, this is interesting but was wondering if there is any newer tech since that post is from 9 years ago – Quade Jun 11 '21 at 07:14
  • 1) Does the external API service provide asynchronous endpoints? 2) Are there any documented limitations regarding the maximum frequency or concurrency that the external API service can be invoked? 3) What is the desirable behavior of your application in case the external API service cannot keep up with the pace, and starts taking longer and longer to respond? – Theodor Zoulias Jun 11 '21 at 12:02
  • We are moving the external api to the local server, however its a database lookup so it will still be slow, it can handle multiple requests now and when local, we dont currently have any back off if the api cannot keep up, and thats when threads go high, i was hoping the async await stuff could deal with this for me. – Quade Jun 12 '21 at 12:14
  • You should try to optimize the database lookup then. If the bottleneck is in the server, optimizing the client is unlikely to bring any benefits. – Theodor Zoulias Jun 12 '21 at 12:47
  • I've already optimized the database, its now much faster than before, the issue I have is dealing with spikes of data and not crashing app with thread starvation – Quade Jun 13 '21 at 03:53
  • Something else that I find strange is the 100,000+ messages per min, combined with 100ms CPU stuff per message. If my math are correct, you would need a CPU with 167 cores in order to satisfy the CPU needs of your app. – Theodor Zoulias Jun 13 '21 at 15:53
  • I would need to measure it and give an average, but that is worst case scenario on a message that needs lots of processing, most go through much quicker, but its very good to know thanks – Quade Jun 14 '21 at 05:03

1 Answers1

0

It sounds like this is the exact scenario asynchronous functions where made for.

If you are using the CPU using background threads will help you a bit, but only up to however many hardware threads you have.

But it sounds like you are mostly blocking on network IO. Using a thread that is just blocked until some kind of IO responds is quite wasteful since each thread consumes some resources. And it can easily result in problems like maxing out the thread pool.

By now, .Net and many libraries have been updated to provide true asynchronous functions for IO. This releases the thread to do other stuff instead of blocking, and when the IO is done it will schedule the remaining work on a new background thread. And using async/await lets you write the code more or less as you would for regular synchronous code, letting the compiler rewrite it to a state machine to deal with the complicated issue of maintaining state. Ideally, you should not need to more threads than the number of hardware threads you have, since each thread should be doing actual work.

Keep in mind that just because there is a async method returning a task it does not necessarily mean it is truly asynchronous. Some base classes/interfaces, like stream, have been extended with asynchronous versions. And some library vendors, rather than doing the work of providing an actual asynchronous implementation, just wraps the synchronous method, providing no real benefit.

For example:

private async void Consumer_Received(...)
{
    try{
        var result = await Task.Run(()=> MyCpuBoundWork());    
        await MyNetworkCall(result);            
    }   
    catch{
         // handle exceptions
    } 
}

As a message is received this will use another background thread to do the CPU bound work. I'm not sure how rabbitMq generates messages, the Task.Run part is only needed if it uses a single thread for all messages. After the CPU bound is done it will continue with the network call.

JonasH
  • 28,608
  • 2
  • 10
  • 23
  • thanks @JonasH, i've read a lot about IO bound and CPU bound work and you should use different task.run/async await calls for each, the issue is when i get the message from rabbitmq, i need to do both CPU and IO work on that thread, so not sure this is bad practice? – Quade Jun 11 '21 at 07:21
  • @quade added example – JonasH Jun 11 '21 at 07:56
  • Thanks, i've updated my original post with latest info/question – Quade Jun 12 '21 at 10:45