4

I am working on improving some of my code to increase efficiency. In the original code I was limiting the number of threads allowed to be 5, and if I had already 5 active threads I would wait until one finished before starting another one. Now I want to modify this code to allow any number of threads, but I want to be able to make sure that only 5 threads get started every second. For example:

  • Second 0 - 5 new threads
  • Second 1 - 5 new threads
  • Second 2 - 5 new threads ...

Original Code (cleanseDictionary contains usually thousands of items):

        ConcurrentDictionary<long, APIResponse> cleanseDictionary = new ConcurrentDictionary<long, APIResponse>();
        ConcurrentBag<int> itemsinsec = new ConcurrentBag<int>();
        ConcurrentDictionary<long, string> resourceDictionary = new ConcurrentDictionary<long, string>();
        DateTime start = DateTime.Now;

        Parallel.ForEach(resourceDictionary, new ParallelOptions { MaxDegreeOfParallelism = 5 }, row =>
        {
            lock (itemsinsec)
            {
                ThrottleAPIRequests(itemsinsec, start);

                itemsinsec.Add(1);
            }

            cleanseDictionary.TryAdd(row.Key, _helper.MakeAPIRequest(string.Format("/endpoint?{0}", row.Value)));
        });


    private static void ThrottleAPIRequests(ConcurrentBag<int> itemsinsec, DateTime start)
    {
        if ((start - DateTime.Now).Milliseconds < 10001 && itemsinsec.Count > 4)
        {
            System.Threading.Thread.Sleep(1000 - (start - DateTime.Now).Milliseconds);
            start = DateTime.Now;
            itemsinsec = new ConcurrentBag<int>();
        }
    }

My first thought was increase the MaxDegreeofParallelism to something much higher and then have a helper method that will limit only 5 threads in a second, but I am not sure if that is the best way to do it and if it is, I would probably need a lock around that step?

Thanks in advance!

EDIT I am actually looking for a way to throttle the API Requests rather than the actual threads. I was thinking they were one in the same.

Edit 2: My requirements are to send over 5 API requests every second

Kevin Mee
  • 539
  • 3
  • 14
  • `Parallel.ForEach` doesn't start new threads. It uses a number of tasks to partition a large amount of data and have each task work exclusively on that data. What are you trying to do with this code and *why* are you trying to "throttle" when you have 5 concurrent calls maximum? – Panagiotis Kanavos Oct 14 '16 at 14:09
  • @PanagiotisKanavos it is a requirement that the host has set up, only 5 was allowed in any given second (so I thought) but now I found out that their counter resets after each second passes. – Kevin Mee Oct 14 '16 at 14:13
  • 1
    It looks like you are trying to throttle *requests*, not threads. What is you *actual* requirement? Execute eg up to 5 concurrent requests, or 5 requests/second ? – Panagiotis Kanavos Oct 14 '16 at 14:14
  • @PanagiotisKanavos yes, I guess thats more clear -- I want to throttle requests so that there is only 5 REQUESTS per second. I do want as many requests out as possible though – Kevin Mee Oct 14 '16 at 14:18
  • I think you need to use a while to pause things rather than an if - using an if means the row just disappears if it's been less time than needed. – Shannon Holsinger Oct 14 '16 at 14:19
  • 2
    The only people allowed to downvote this question are those that had to implement request throttling in the past! Only they understand how confusing this is if you *don't* know the techniques! – Panagiotis Kanavos Oct 14 '16 at 14:19
  • @ShannonHolsinger no need to pause, .NET has several classes that can be used for throttling, even the specialized Reactive Extensions library – Panagiotis Kanavos Oct 14 '16 at 14:20
  • Yeah agreed - I'd this this very differently. I'm just saying within the context of what's written – Shannon Holsinger Oct 14 '16 at 14:20
  • I'm "old fashioned" - with a huge time difference like one whole second, I'd use a backgroundworker and thread.Sleep but yes there are more elegant solutions – Shannon Holsinger Oct 14 '16 at 14:21
  • @KevinMee you combine Reactive Extensions and any other of the Parallel/PLINQ/Dataflow techniques. Eg, [Observable.Buffer](https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx) will buffer requests up to eg 1 second or 5 requests before sending the collected requests as an array. – Panagiotis Kanavos Oct 14 '16 at 14:23
  • @ShannonHolsinger BGW is deprecated and not suitable for this scenario anyway. A single Timer would be better. You still have to provide queues, so there is not much benefit to sleep/BGW – Panagiotis Kanavos Oct 14 '16 at 14:24
  • @PanagiotisKanavos I haven't used Observable.Buffer before.. I will look into that! – Kevin Mee Oct 14 '16 at 14:24
  • I don't quite understand the requirements but a common rate limiting algo is "token bucket": http://stackoverflow.com/questions/667508/whats-a-good-rate-limiting-algorithm – usr Oct 14 '16 at 14:33
  • @usr I just added the requirements to the post in an edit.. I want to be able to send over 5 API request every second – Kevin Mee Oct 14 '16 at 14:41

3 Answers3

1

"Parallel.ForEach" from the MS website

may run in parallel

If you want any degree of fine control over how the threads are managed, this is not the way.
How about creating your own helper class where you can queue jobs with a group id, allows you to wait for all jobs of group id X to complete, and it spawns extra threads as and when required?

UKMonkey
  • 6,941
  • 3
  • 21
  • 30
  • @PanagiotisKanavos who knows - on re-reading the question it's all suddenly less clear what the question really is. – UKMonkey Oct 14 '16 at 14:17
1

For me the best solution is:

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace SomeNamespace
{
    public class RequestLimiter : IRequestLimiter
    {
        private readonly ConcurrentQueue<DateTime> _requestTimes;
        private readonly TimeSpan _timeSpan;

        private readonly object _locker = new object();

        public RequestLimiter()
        {
            _timeSpan = TimeSpan.FromSeconds(1);
            _requestTimes = new ConcurrentQueue<DateTime>();
        }

        public TResult Run<TResult>(int requestsOnSecond, Func<TResult> function)
        {
            WaitUntilRequestCanBeMade(requestsOnSecond).Wait();
            return function();
        }

        private Task WaitUntilRequestCanBeMade(int requestsOnSecond)
        {
            return Task.Factory.StartNew(() =>
            {
                while (!TryEnqueueRequest(requestsOnSecond).Result) ;
            });
        }

        private Task SynchronizeQueue()
        {
            return Task.Factory.StartNew(() =>
            {
                _requestTimes.TryPeek(out var first);

                while (_requestTimes.Count > 0 && (first.Add(_timeSpan) < DateTime.UtcNow))
                    _requestTimes.TryDequeue(out _);
            });
        }

        private Task<bool> TryEnqueueRequest(int requestsOnSecond)
        {
            lock (_locker)
            {
                SynchronizeQueue().Wait();
                if (_requestTimes.Count < requestsOnSecond)
                {
                    _requestTimes.Enqueue(DateTime.UtcNow);
                    return Task.FromResult(true);
                }
                return Task.FromResult(false);
            }
        }
    }
}
0

I want to be able to send over 5 API request every second

That's really easy:

while (true) {
 await Task.Delay(TimeSpan.FromSeconds(1));
 await Task.WhenAll(Enumerable.Range(0, 5).Select(_ => RunRequestAsync()));
}

Maybe not the best approach since there will be a burst of requests. This is not continuous.

Also, there is timing skew. One iteration takes more than 1 second. This can be solved with a few lines of time logic.

usr
  • 168,620
  • 35
  • 240
  • 369
  • So I could just loop while there are requests to be sent over. What do you mean this is not continuous? – Kevin Mee Oct 14 '16 at 15:20
  • Once each second there is a burst of request. If you wanted to send 1m requests once per hours this would be a problem. At 5 every 1s its probably not. – usr Oct 14 '16 at 15:32
  • The usual count I get is between 500 and 5000.. would it be an issue here? I am looking for an efficient solution to get 5 requests out every second (or as close I can to it) – Kevin Mee Oct 14 '16 at 15:44
  • What "count" are you speaking of? – usr Oct 14 '16 at 15:59
  • the amount of API requests I need to send – Kevin Mee Oct 14 '16 at 16:06
  • The total amount does not matter. What matters is how many per loop iteration. If it's just 5 it's probably not important. Read: `If you wanted to send 1m requests once per hour this would be a problem.`. Does this make it clear what the (potential) issue is? Sending 1m request *at the same time* is not going to work. 5 at the same time likely is working. – usr Oct 14 '16 at 16:10
  • Ah yes.. Since I want to send like 5000 in batches of 5, I should be ok using this method – Kevin Mee Oct 14 '16 at 16:12
  • Yes, likely. It's a quick, easy hack to be done with it. – usr Oct 14 '16 at 16:19
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/125732/discussion-between-kevin-mee-and-usr). – Kevin Mee Oct 14 '16 at 17:40