2

I have a service that needs to read in messages from Amazon SQS as quickly as possible. We are expecting heavy traffic, and I'd like to be able to read in upwards of 10K messages/second. Unfortunately I'm currently at around 10 messages/second. Clearly, I have work to do.

This is what I'm using (converted to a console app to make testing easier):

private static int _concurrentRequests;
private static int _maxConcurrentRequests;

public static void Main(string[] args) {
    _concurrentRequests = 0;
    _maxConcurrentRequests = 100;

    var timer = new Timer();
    timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
    timer.Interval = 10;
    timer.Enabled = true;

    Console.ReadLine();
    timer.Dispose();
}

public static void OnTimedEvent(object s, ElapsedEventArgs e) {
    if (_concurrentRequests < _maxConcurrentRequests) {
        _concurrentRequests++;
        ProcessMessages();
    }
}

public static async Task ProcessMessages() {
    var manager = new MessageManager();
    manager.ProcessMessages();  // this is an async method that reads in the messages from SQS

    _concurrentRequests--;
}

I'm not getting anywhere near 100 concurrent requests, and it doesn't seem to be firing OnTimedEvent every 10 milliseconds.

I'm not sure if Timer is the right approach here. I don't have much experience with this kind of coding. I'm open to trying anything at this point.

Update

Thanks to calebboyd, I'm a little closer to achieving my goal. Here's some really bad code:

private static SemaphoreSlim _locker;

public static void Main(string[] args) {
    _manager = new MessageManager();

    RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
    _locker = new SemaphoreSlim(10, 10);
    while (true) {
        Thread thread = new Thread(new ParameterizedThreadStart(Process));
        thread.Start();
    }
}

private static async void Process(object args) {
    _locker.WaitAsync();
    try {
        await _manager.ProcessMessages();
    }
    finally {
        _locker.Release();
    }

}

I'm able to come close to reading a respectable number of messages per second with this, but the problem is my ProcessMessages call never finishes (or maybe it would after a very long time). I'm thinking I probably need to limit the number of threads I have running at any one time.

Any suggestions on how I can improve this code so that ProcessMessages has a chance to finish?

Irving
  • 1,257
  • 2
  • 16
  • 28
  • 2
    Timers have a limitation of being able to fire at best every 15.6 milliseconds. That's a limitation with the .NET implementation of timers. – Enigmativity Sep 05 '15 at 05:49
  • The code that claims to be asynchronous has no `await` - that is very strange. Consider updating your code to look more real. – Alexei Levenkov Sep 05 '15 at 05:52
  • Here's a very important question to help determine what is possible with multi-threading - how many messages can you handle per second if you run the requests in series on a single thread? – Enigmativity Sep 05 '15 at 05:56
  • Is your primary goal pooling amazon SQS as rapidly as possible? – shay__ Sep 06 '15 at 06:48
  • Yes, it is. I need to grab the messages and insert them into a database. My updated code grabs the messages fast enough, but the inserts into the database stall out quickly. I see the inserts stop about 20 seconds into execution. – Irving Sep 06 '15 at 11:52
  • 1
    @Irving So your DB is the bottleneck - did you consider buffering the messages, and insert them into the DB in bulks? – shay__ Sep 06 '15 at 17:30
  • @shay__ I do believe the DB is some of my problems, but I think my current issue is the number of threads I'm creating. I set a breakpoint in the call that does the insert in the DB and never saw it get hit (after it stalls out). That's an excellent idea to get more performance out of my service though. Thanks for your suggestion! – Irving Sep 07 '15 at 00:30
  • @Irving I'm not sure why you are creating a lot of threads. One thread can pool amazon very fast. Please see my suggested answer. – shay__ Sep 07 '15 at 06:53

3 Answers3

3

Because your ProcessMessages method on your MessageManager object is not awaited I will assume that it is bound to the same thread it is executed in. Merely marking the function as async does not pass the work to a new thread. With that assumption, this code is not actually executing with multiple threads. You can use the following code to execute your code in more of the thread pool.

Its likely that the manager object cannot handle concurrent usage. So I create it in the Task.Run lambda. This could also be expensive and therefore impractical.

async Task RunBatchProcessingForeverAsync () {
    var lock = new SemaphoreSlim(initialCount: 10);
    while (true) {
        await lock.WaitAsync();
        Task.Run(() => {
            try {
                var manager = new MessageManager();
                manager.ProcessMessages();
            } finally {
                lock.Release();
            }
        });
    }
}

I haven't written C# in a while but this should run your method 10 times simultaneously, repeatedly, forever.

usr
  • 168,620
  • 35
  • 240
  • 369
calebboyd
  • 5,744
  • 2
  • 22
  • 32
  • 3
    I have never seen this pattern. I like it. – usr Sep 05 '15 at 09:19
  • Thank you! I was able to make some progress with this, but I'm still running into a roadblock. I had to update your code a little to get it fast enough. Could you take a look at my update? – Irving Sep 06 '15 at 06:47
  • You should be able to call `RunBatchProcessingForeverAsync().Wait()` from your main method. It looks like you might be creating an unbounded number of threads (no `await` in the `while` loop). Does your `ProcessMessages` return a Task that can be awaited? – calebboyd Sep 06 '15 at 19:39
  • I was concerned about the number of threads I'm creating too, but not sure how to throttle that. I actually have an await on my call to `ProcessMessages`. How would I add an await in the while loop? – Irving Sep 07 '15 at 00:26
  • Thanks a lot for your suggestions. To answer your question, yes, the signature on `ProcessMessages` looks like this: `public async Task ProcessMessages()`. I will try out your suggestion when I get a chance. – Irving Sep 09 '15 at 04:23
1

As @calebboyd suggested, you must at first make your thread async. Now, if you go here - Where to use concurrency when calling an API, you'll see that one async thread is enough for pooling a network resource pretty rapidly. If you are able to get from amazon multiple messages in a single request, then your producer thread (the one that makes async calls to amazon) will be just fine - it can send hundreds of requests per second. This will not be your bottle neck. However, the continuation Tasks, where the received data is processed - are handed to the thread pool. Here you have a chance for a bottle neck - suppose 100 responses arrive each second, each response contains 100 messages (to reach your 10K msgs/sec approximation). Every second You have 100 new Tasks, each will require your thread to handle 100 messages. Now there are two options: (1) the processing of these messages is not CPU bound - you simply sending them to your DB, or (2), you perform CPU consuming calculations, for example scientific calculations, serializations or some heavy business logic. If (1) is your case, then the bottleneck is pushed backward towards you DB. If (2), then you have no option but to scale up / out, or optimizing the calculations. But your bottleneck is probably not the producing thread - if it's implemented right (see the above link for examples).

Community
  • 1
  • 1
shay__
  • 3,815
  • 17
  • 34
  • This was my problem. The database is most definitely the bottleneck now. Implementing your idea of inserting in bulk should help a lot though. Thank you for your help! – Irving Sep 15 '15 at 02:41
0

I would presume the async methods are queued in a thread pool which only has as many threads as you have available processors. You might generate 100 requests but they are still executed by 8 threads. Try create array of N threads and put them to use.

  • 1
    Calling `ThreadPool.GetMaxThreads` return 1023 available threads on my machine. I have one processor, with four cores, and 8 hyper threads. – Enigmativity Sep 05 '15 at 05:52
  • @Enigmativity that doesn't mean Robert Hudjakov is completely wrong - to my understanding , in a theoretical pure async program the thread pool won't create more then 8 (in your case) threads to handle all the queued Tasks, even if the buffer is saturated. I could be wrong, though. – shay__ Sep 06 '15 at 07:00