3

I have a requirement where I can hit an API 5 times in a second. If I have to make a total of 50 requests, I want to make the first 5 requests and wait for 1 second before I can hit the API with another batch of 5 requests. I tried using Thread pool as well as Parallel task library For\Foreach loops and Task classes but I am unable to get a sequential counter that would tell me that 5 Tasks have been created. Here is a sample of what I am trying to do:

List<string> str = new List<string>();
for (int i = 0; i <= 100; i++)
{
    str.Add(i.ToString());
}

Parallel.ForEach(str, new ParallelOptions { MaxDegreeOfParallelism = 5 },
(value, pls, index) =>
{
    Console.WriteLine(value);// simulating method call
    if (index + 1 == 5)
    {
        // need the main thread to sleep so next batch is 
        Thread.Sleep(1000);
    }
});
Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • 3
    Have you considered using a better approach? Maybe `async/await`? – Matias Cicero Sep 21 '15 at 12:27
  • I am working with .net 4.0. As far as I know async/await is available from .net 4.5. Correct me if I am wrong please. – Ridhima Muglani Sep 21 '15 at 12:34
  • You can use async await in .net 4.0 given that you have VS 2012 or later. Refer http://stackoverflow.com/questions/19421878/how-can-i-use-the-async-keywords-in-a-project-targeting-net-4-0 – Sriram Sakthivel Sep 21 '15 at 12:45
  • @SriramSakthivel instead of suggesting new tools which _may not be avaliable to the OP_, try and fix the issue with code. – Gusdor Sep 21 '15 at 12:59
  • @Gusdor Suggestions and recommendations are valid as comments – Matias Cicero Sep 21 '15 at 13:05
  • 3
    @Gusdor Suggesting tools which *may or may not* be available to the OP are a valid suggestion, as he may not know of that they exist at all. Using `async-await`, as per suggested for example, will make the OPs code cleaner and possibly more scalable, instead of using this threaded approach. – Yuval Itzchakov Sep 21 '15 at 13:10

5 Answers5

3

Since you're using .NET 4.0 (and assuming, hopefully, that you're at least using VS2012), you can use Microsoft.Bcl.Async to get async-await features.

Once you do that, you can easily query your API endpoint asynchronously (no need for extra threads for that), and use a AsyncSemaphore (see implementation below) to cap the number of requests you do concurrently.

For example:

public readonly AsyncSemaphore = new AsyncSemaphore(5);
public readonly HttpClient httpClient = new HttpClient();
public async Task<string> LimitedQueryAsync(string url)
{
    await semaphoreSlim.WaitAsync();
    try
    {
        var response = await httpClient.GetAsync(url);
        return response.Content.ReadAsStringAsync();
    }
    finally
    {
        semaphoreSlim.Release();
    }
}

Now you can query it like this:

public async Task DoQueryStuffAsync()
{
    while (someCondition)
    {
        var results = await LimitedQueryAsync(url);

        // do stuff with results
        await Task.Delay(1000);
    }
}

Edit: As @ScottChamberlain points out correctly, SemaphoreSlim isn't avaliable in .NET 4. You can instead use AsyncSemaphore, which looks as follows:

public class AsyncSemaphore 
{ 
    private readonly static Task s_completed = Task.FromResult(true); 
    private readonly Queue<TaskCompletionSource<bool>> m_waiters = 
                                            new Queue<TaskCompletionSource<bool>>(); 
    private int m_currentCount; 

    public AsyncSemaphore(int initialCount)
    {
        if (initialCount < 0) 
        {
            throw new ArgumentOutOfRangeException("initialCount"); 
        }
        m_currentCount = initialCount; 
    }

    public Task WaitAsync() 
    { 
        lock (m_waiters) 
        { 
            if (m_currentCount > 0) 
            { 
                --m_currentCount; 
                return s_completed; 
            } 
            else 
            { 
                var waiter = new TaskCompletionSource<bool>(); 
                m_waiters.Enqueue(waiter); 
                return waiter.Task; 
            } 
        } 
    }

    public void Release() 
    { 
        TaskCompletionSource<bool> toRelease = null; 
        lock (m_waiters) 
        { 
            if (m_waiters.Count > 0) 
                toRelease = m_waiters.Dequeue(); 
            else 
                ++m_currentCount; 
        } 
        if (toRelease != null) 
            toRelease.SetResult(true); 
    }
}
Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • Given that `Microsoft.Bcl.Async` may not be available, can you suggest how this may be accomplished using `Task`? – Gusdor Sep 21 '15 at 13:01
  • Does `WaitAsync()` get added with `Microsoft.Bcl.Async` (perhaps via a extension method)? It is not part of `SemaphoreSlim` in 4.0 – Scott Chamberlain Sep 21 '15 at 13:02
  • @ScottChamberlain You're right. I forgot about that. Edited the code. – Yuval Itzchakov Sep 21 '15 at 13:05
  • 1
    I just double checked, it provides the two classes `System.Threading.Tasks.TaskEx` and `System.Net.DnsEx` and it provides various extensions in `AsyncExtensions`, `AsyncPlatformExtensions`, and `AwaitExtensions` but none of them deal with Semaphores, most of them involve making various IO operations use the [TAP model](https://msdn.microsoft.com/en-us/library/hh873175(v=vs.110).aspx) or extending `Task` and `CancelationTokenSource` to add features added in 4.5. – Scott Chamberlain Sep 21 '15 at 13:57
  • @ScottChamberlain Yes, I just remembered I answered [another question](http://stackoverflow.com/questions/28028262/using-await-semaphoreslim-waitasync-in-net-4) which asked that exactly :P – Yuval Itzchakov Sep 21 '15 at 14:04
1

If already constrained to 5 per second, how important is it to run in parallel? Here's a different perspective to try (not compile tested). The idea being to throttle each, instead of throttling a batch.

foreach(string value in values)
{
  const int alottedMilliseconds = 200;
  Stopwatch timer = Stopwatch.StartNew();

  // ...

  timer.Stop();
  int remainingMilliseconds = alottedMilliseconds - timer.ElapsedMilliseconds;
  if(remainingMilliseconds > 0)
  {
    // replace with something more precise/thread friendly as needed.
    Thread.Sleep(remainingMilliseconds);
  }
}

Or in the spirit of your original requirements. Extend your solution with an extension method that partitions your list into chunks of 5...

public static IEnumerable<List<T>> Partition<T>(this IList<T> source, Int32 size)
{
  for (int i = 0; i < Math.Ceiling(source.Count / (Double)size); i++)
  {
    yield return new List<T>(source.Skip(size * i).Take(size));
  }
}

Call your Parallel.ForEach within an outer loop utilizing this extension, then apply the same timer approach at the end of each outer loop. Something like this...

foreach(IEnumerable<string> batch in str.Partitition(5))
{
  Stopwatch timer = Stopwatch.StartNew();

  Parallel.ForEach(
    batch, 
    new ParallelOptions { MaxDegreeOfParallelism = 5 },
    (value, pls, index) =>
    {
      Console.WriteLine(value);// simulating method call
    });

  timer.Stop();
  int remainingMilliseconds = 5000 - timer.ElapsedMilliseconds;
  if(remainingMilliseconds > 0)
  {
    // replace with something more precise/thread friendly as needed.
    Thread.Sleep(remainingMilliseconds);
  }
}
Trevor Ash
  • 623
  • 4
  • 8
0

Given below are two approaches. Both ways, you will achieve your desired configuration for testing. Not only the code is concise but also you achieve it without locks.


1) Recursive

You have to make 50 requests in a batch of 5 request each. That means total 10 batches of 5 requests at the interval of 1 second. Defining the entities, let:

  • HitAPI() be the thread-safe method of calling the API one time;
  • InitiateBatch() be the method to start a batch of 5 threads to hit the API,

then, the sample implementation can be:

private void InitiateRecursiveHits(int batchCount)
{
    return InitiateBatch(batchCount);
}

Just call the above method with batchCount = 10 and it will call the code below..

private void InitiateBatch(int batchNumber)
{
    if (batchNumber <= 0) return;
    var hitsPerBatch = 5;
    var thisBatchHits = new Task[hitsPerBatch];
    for (int taskNumber = 1; taskNumber <= hitsPerBatch; taskNumber++)
         thisBatchHits[taskNumber - 1] = Task.Run(HitAPI);
    Task.WaitAll(thisBatchHits);
    Thread.Sleep(1000); //To wait for 1 second before starting another batch of 5
    InitiateBatch(batchNumber - 1);
    return
}

2) Iterative

This is simpler than the first approach. Just do the recursive approach in the iterative manner...

private void InitiateIterativeHits(int batchCount)
{
    if (batchCount <= 0) return;
    // It's good programming practice to leave your input variables intact so that 
    // they hold correct value throughout the execution
    int desiredRuns = batchCount;
    var hitsPerBatch = 5;
    while (desiredRuns-- > 0)
    {
        var thisBatchHits = new Task[hitsPerBatch];
        for (int taskNumber = 1; taskNumber <= hitsPerBatch; taskNumber++)
            thisBatchHits[taskNumber - 1] = Task.Run(HitAPI);
        Task.WaitAll(thisBatchHits);
        Thread.Sleep(1000); //To wait for 1 second before starting another batch of 5
    }
}
displayName
  • 13,888
  • 8
  • 60
  • 75
0

I would use Microsoft's Reactive Framework (NuGet "Rx-Main") for this.

Here's what it would look like:

var query =
    Observable
        .Range(0, 100)
        .Buffer(5)
        .Zip(Observable.Interval(TimeSpan.FromSeconds(1.0)), (ns, i) => ns)
        .SelectMany(ns =>
            ns
                .ToObservable()
                .SelectMany(n =>
                    Observable
                        .Start(() =>
                        {
                            /* call here */
                            Console.WriteLine(n);
                            return n;
                        })));

You would then handle the results like this:

var subscription =
    query
        .Subscribe(x =>
        {
            /* handle result here */
        });

If you need to stop the requests before they naturally complete you just need to call subscription.Dispose();.

Nice, clean, and declarative.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
-1

Maybe:

while(true){
   for(int i = 0; i < 5; i++)
       Task.Run(() => { <API STUFF> });
   Thread.Sleep(1000);
}

Im not sure that calling task.run like this all the time is efficient though.

Glubus
  • 2,819
  • 1
  • 12
  • 26