4

I have an IEnumerable<Task>, where each Task will call the same endpoint. However, the endpoint can only handle so many calls per second. How can I put, say, a half second delay between each call?

I have tried adding Task.Delay(), but of course awaiting them simply means that the app waits a half second before sending all the calls at once.

Here is a code snippet:

    var resultTasks = orders
        .Select(async task => 
        {
            var result = new VendorTaskResult();
            try
            {
                result.Response = await result.CallVendorAsync();
            }
            catch(Exception ex)
            {
                result.Exception = ex;
            }
            return result;
        } );

    var results = Task.WhenAll(resultTasks);

I feel like I should do something like

    Task.WhenAll(resultTasks.EmitOverTime(500));

... but how exactly do I do that?

ProgrammingLlama
  • 36,677
  • 7
  • 67
  • 86
emery.noel
  • 1,073
  • 1
  • 9
  • 24
  • 1
    Use `await Task.Delay()` *between* calls. If you put that before or after the call to `CallVendorAync()` you'll get te delay you want. – Panagiotis Kanavos Jul 01 '19 at 11:56
  • Call `Task.Delay` the first thing you do in the `try` clause? – mm8 Jul 01 '19 at 12:03
  • Have you considered [throttling](https://codereview.stackexchange.com/questions/87132/throttle-actions-by-number-per-period)? – Peter Wolf Jul 01 '19 at 12:20
  • If I call (and await) Task.Delay() inside the delegate, then the next continuation begins immediately. The result is they all hit Delay() in ns, then they all delay .5 seconds together, then they all execute pretty much at once. – emery.noel Jul 01 '19 at 12:22
  • @PeterWolf - throttling sounds promising! I will dig into it. The only other throttling SO post I found (https://stackoverflow.com/questions/22492383/throttling-asynchronous-tasks) talked about a maximum count-at-a-time, which wasn't quite what I wanted. – emery.noel Jul 01 '19 at 12:24
  • Yes, sure, the idea is not to await delay, but instead await a semaphore and release it after the delay. – Peter Wolf Jul 01 '19 at 12:26
  • Also check this [rate limiter from David Desmaisons](https://github.com/David-Desmaisons/RateLimiter). – Peter Wolf Jul 01 '19 at 12:32
  • I had a similar issue, I have put my three approaches. Hopefully, the third one is what you are looking for. – Miguel Mateo Jul 01 '19 at 12:40
  • @PeterWolf if you put your suggestion in an answer I will mark it, as it's exactly what I needed. – emery.noel Aug 13 '19 at 18:23

6 Answers6

0

This is just a thought, but another approach could be to create a queue and add another thread that runs polling the queue for calls that need to go out to your endpoint.

John Kane
  • 4,383
  • 1
  • 24
  • 42
  • A queue is a good idea, but it seems like overkill for something that feels like it should be so simple. I'm 99% certain you can "emit over time" with something like rxjs/observables, I figured it would be built in to the C# stuff. – emery.noel Jul 01 '19 at 12:19
0

Have you considered just turning that into a foreach-loop with a Task.Delay call? You seem to want to explicitly call them sequentially, it won't hurt if that is obvious from your code.

var results = new List<YourResultType>;
foreach(var order in orders){
            var result = new VendorTaskResult();
            try
            {
                result.Response = await result.CallVendorAsync();
                results.Add(result.Response);
            }
            catch(Exception ex)
            {
                result.Exception = ex;
            }
}
Max Keller
  • 372
  • 1
  • 10
  • Thanks Max. I don't want to wait for each one to return before starting the next one. The vendor I'm calling can't take 20 calls in 20 ms, but then can 20 over 5 seconds or so. I just need to space them out a little bit. Each call on the vendor's side takes between 10 and 40 seconds. – emery.noel Jul 01 '19 at 12:22
0

Instead of selecting from orders you could loop over them, and inside the loop put the result into a list and then call Task.WhenAll. Would look something like:

var resultTasks = new List<VendorTaskResult>(orders.Count);
orders.ToList().ForEach( item => {
    var result = new VendorTaskResult();
    try
    {
        result.Response = await result.CallVendorAsync();
    }
    catch(Exception ex)
    {
        result.Exception = ex;
    }
    resultTasks.Add(result);
    Thread.Sleep(x);
});

var results = Task.WhenAll(resultTasks);

If you want to control the number of requests executed simultaneously, you have to use a semaphore.

0

I have something very similar, and it works fine with me. Please note that I call ToArray() after the Linq query finishes, that triggers the tasks:

using (HttpClient client = new HttpClient()) {
    IEnumerable<Task<string>> _downloads = _group
        .Select(job => {
            await Task.Delay(300);
            return client.GetStringAsync(<url with variable job>);
        });

    Task<string>[] _downloadTasks = _downloads.ToArray();

    _pages = await Task.WhenAll(_downloadTasks);
}

Now please note that this will create n nunmber of tasks, all in parallel, and the Task.Delay literally does nothing. If you want to call the pages synchronously (as it sounds by putting a delay between the calls), then this code may be better:

using (HttpClient client = new HttpClient()) {
    foreach (string job in _group) {
        await Task.Delay(300);
        _pages.Add(await client.GetStringAsync(<url with variable job>));
    }
}

The download of the pages is still asynchronous (while downloading other tasks are done), but each call to download the page is synchronous, ensuring that you can wait for one to finish in order to call the next one.

The code can be easily changed to call the pages asynchronously in chunks, like every 10 pages, wait 300ms, like in this sample:

IEnumerable<string[]> toParse = myData
    .Select((v, i) => new { v.code, group = i / 20 })
    .GroupBy(x => x.group)
    .Select(g => g.Select(x => x.code).ToArray());

using (HttpClient client = new HttpClient()) {
    foreach (string[] _group in toParse) {
        string[] _pages = null;

        IEnumerable<Task<string>> _downloads = _group
            .Select(job => {
                return client.GetStringAsync(<url with job>);
            });

        Task<string>[] _downloadTasks = _downloads.ToArray();

        _pages = await Task.WhenAll(_downloadTasks);

        await Task.Delay(5000);
    }
}

All this does is group your pages in chunks of 20, iterate through the chunks, download all pages of the chunk asynchronously, wait 5 seconds, move on to the next chunk.

I hope that is what you were waiting for :)

Miguel Mateo
  • 189
  • 1
  • 15
0

What you describe in your question is in other words rate limiting. You'd like to apply rate limiting policy to your client, because the API you use enforces such a policy on the server to protect itself from abuse.

While you could implement rate limiting yourself, I'd recommend you to go with some well established solution. Rate Limiter from Davis Desmaisons was the one that I picked at random and I instantly liked it. It had solid documentation, superior coverage and was easy to use. It is also available as NuGet package.

Check out the simple snippet below that demonstrates running semi-overlapping tasks in sequence while defering the task start by half a second after the immediately preceding task started. Each task lasts at least 750 ms.

using ComposableAsync;
using RateLimiter;
using System;
using System.Threading.Tasks;

namespace RateLimiterTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Log("Starting tasks ...");
            var constraint = TimeLimiter.GetFromMaxCountByInterval(1, TimeSpan.FromSeconds(0.5));
            var tasks = new[]
            {
                DoWorkAsync("Task1", constraint),
                DoWorkAsync("Task2", constraint),
                DoWorkAsync("Task3", constraint),
                DoWorkAsync("Task4", constraint)
            };
            Task.WaitAll(tasks);
            Log("All tasks finished.");
            Console.ReadLine();
        }

        static void Log(string message)
        {
            Console.WriteLine(DateTime.Now.ToString("HH:mm:ss.fff ") + message);
        }

        static async Task DoWorkAsync(string name, IDispatcher constraint)
        {
            await constraint;
            Log(name + " started");
            await Task.Delay(750);
            Log(name + " finished");
        }
    }
}

Sample output:

10:03:27.121 Starting tasks ...
10:03:27.154 Task1 started
10:03:27.658 Task2 started
10:03:27.911 Task1 finished
10:03:28.160 Task3 started
10:03:28.410 Task2 finished
10:03:28.680 Task4 started
10:03:28.913 Task3 finished
10:03:29.443 Task4 finished
10:03:29.443 All tasks finished.

If you change the constraint to allow maximum two tasks per second (var constraint = TimeLimiter.GetFromMaxCountByInterval(2, TimeSpan.FromSeconds(1));), which is not the same as one per half a second, then the output could be like:

10:06:03.237 Starting tasks ...
10:06:03.264 Task1 started
10:06:03.268 Task2 started
10:06:04.026 Task2 finished
10:06:04.031 Task1 finished
10:06:04.275 Task3 started
10:06:04.276 Task4 started
10:06:05.032 Task4 finished
10:06:05.032 Task3 finished
10:06:05.033 All tasks finished.

Note that the current version of Rate Limiter targets .NETFramework 4.7.2+ or .NETStandard 2.0+.

Peter Wolf
  • 3,700
  • 1
  • 15
  • 30
-2

The proposed method EmitOverTime is doable, but only by blocking the current thread:

public static IEnumerable<Task<TResult>> EmitOverTime<TResult>(
    this IEnumerable<Task<TResult>> tasks, int delay)
{
    foreach (var item in tasks)
    {
        Thread.Sleep(delay); // Delay by blocking
        yield return item;
    }
}

Usage:

var results = await Task.WhenAll(resultTasks.EmitOverTime(500));

Probably better is to create a variant of Task.WhenAll that accepts a delay argument, and delays asyncronously:

public static async Task<TResult[]> WhenAllWithDelay<TResult>(
    IEnumerable<Task<TResult>> tasks, int delay)
{
    var tasksList = new List<Task<TResult>>();
    foreach (var task in tasks)
    {
        await Task.Delay(delay).ConfigureAwait(false);
        tasksList.Add(task);
    }
    return await Task.WhenAll(tasksList).ConfigureAwait(false);
}

Usage:

var results = await WhenAllWithDelay(resultTasks, 500);

This design implies that the enumerable of tasks should be enumerated only once. It is easy to forget this during development, and start enumerating it again, spawning a new set of tasks. For this reason I propose to make it an OnlyOnce enumerable, as it is shown in this question.


Update: I should mention why the above methods work, and under what premise. The premise is that the supplied IEnumerable<Task<TResult>> is deferred, in other words non-materialized. At the method's start there are no tasks created yet. The tasks are created one after the other during the enumeration of the enumerable, and the trick is that the enumeration is slow and controlled. The delay inside the loop ensures that the tasks are not created all at once. They are created hot (in other words already started), so at the time the last task has been created some of the first tasks may have already been completed. The materialized list of half-running/half-completed tasks is then passed to Task.WhenAll, that waits for all to complete asynchronously.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104