0

I need to run many tasks in parallel as fast as possible. But if my program runs more than 30 tasks per 1 second, it will be blocked. How to ensure that tasks run no more than 30 per any 1-second interval?

In other words, we must prevent the new task from starting if 30 tasks were completed in the last 1-second interval.

My ugly possible solution:

private async Task Process(List<Task> taskList, int maxIntervalCount, int timeIntervalSeconds)
{
    var timeList = new List<DateTime>();

    var sem = new Semaphore(maxIntervalCount, maxIntervalCount);
    var tasksToRun = taskList.Select(async task =>
    {
        do
        {
            sem.WaitOne();
        }
        while (HasAllowance(timeList, maxIntervalCount, timeIntervalSeconds));

        await task;

        timeList.Add(DateTime.Now);

        sem.Release();
    });

    await Task.WhenAll(tasksToRun);
}

private bool HasAllowance(List<DateTime> timeList, int maxIntervalCount, int timeIntervalSeconds)
{
    return timeList.Count <= maxIntervalCount 
    || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount];
}
Palindromer
  • 854
  • 1
  • 10
  • 29
  • 1
    Actual throttling is provided by ReactiveX operators like `Window` and `Buffer`. You can use an `ActionBlock` with a limited DOP and possibly an `await Task.Delay()` to ensure you don't make more than N calls/second – Panagiotis Kanavos Feb 13 '20 at 12:57
  • 1
    Whoever voted to close as "opinion-based", it's definitely not. You can argue there have been similar questions in the past, but this is *definitely* not a matter of opinion. – Panagiotis Kanavos Feb 13 '20 at 13:03
  • 1
    [This is probably a duplicate](https://stackoverflow.com/questions/22492383/throttling-asynchronous-tasks). One answer shows how to use a DataFlow block with a DOP of 50 to limit concurrent operations to 50. The other shows how to use a SemaphoreSlim. One could use *both* - a DOP to limit operations to no more than 30 (or less), and a SemaphoreSlim that gets reset every 1 second by a timer. – Panagiotis Kanavos Feb 13 '20 at 13:08
  • BTW in this code, the tasks are *all already running*. The code only *awaits* them to complete at a specific rate. That's not what you asked though - that's more like *batching* incoming operations before passing them to the step that needs throttling. You can get similar behavior with a BatchBlock – Panagiotis Kanavos Feb 13 '20 at 13:15
  • 1
    Lets suppose that initially you start 30 of your tasks. At the time 0:00.5 (half a second later) all 30 tasks are still running. At the time 0:01.0 (one second later) 15 tasks have been completed and 15 are still running. Is it allowed then to start 15 more tasks? If it is, then at the interval 0:00.5 - 0:01.1 more than 30 tasks were active. If it's not, then you will be allowed to start a new task only after the completion of **all** 30 initial tasks. Which one is the desired behavior? – Theodor Zoulias Feb 13 '20 at 13:18
  • 1
    @TheodorZoulias if 15 tasks have been completed at the time 0:01.0 It means that we can run the next 15 tasks only after 1 second - at the time 0:02.0. Because like you said we will have more than 30 active tasks in 1-second interval. – Palindromer Feb 13 '20 at 13:39
  • 1
    Thanks @Palindromer for the clarification. I have wrote an answer that solves this problem elegantly by using a `SemaphoreSlim` and a `Task.Delay`, but I must wait until the question is reopened before I can post my answer. – Theodor Zoulias Feb 13 '20 at 17:53
  • 2
    This is a good and enough detailed question, why it was closed? – konstantin_doncov Feb 13 '20 at 21:50

4 Answers4

1

User code should never have to control how tasks are scheduled directly. For one thing, it can't - controlling how tasks run is the job of the TaskScheduler. When user code calls .Start(), it simply adds a task to a threadpool queue for execution. await executes already executing tasks.

The TaskScheduler samples show how to create limited concurrency schedulers, but again, there are better, high-level options.

The question's code doesn't throttle the queued tasks anyway, it limits how many of them can be awaited. They are all running already. This is similar to batching the previous asynchronous operation in a pipeline, allowing only a limited number of messages to pass to the next level.

ActionBlock with delay

The easy, out-of-the-box way would be to use an ActionBlock with a limited MaxDegreeOfParallelism, to ensure no more than N concurrent operations can run at the same time. If we know how long each operation takes, we could add a bit of delay to ensure we don't overshoot the throttle limit.

In this case, 7 concurrent workers perform 4 requests/second, for a total of 28 maximum request per second. The BoundedCapacity means that only up to 7 items will be stored in the input buffer before downloader.SendAsync blocks. This way we avoid flooding the ActionBlock if the operations take too long.

var downloader = new ActionBlock<string>(
        async url => {
            await Task.Delay(250);
            var response=await httpClient.GetStringAsync(url);
            //Do something with it.
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 7, BoundedCapacity=7 }
);

//Start posting to the downloader
foreach(var item in urls)
{
    await downloader.SendAsync(item);
}
downloader.Complete();
await downloader.Completion;

ActionBlock with SemaphoreSlim

Another option would be to combine this with a SemaphoreSlim that gets reset periodically by a timer.

var refreshTimer = new Timer(_=>sm.Release(30));

var downloader = new ActionBlock<string>(
        async url => {
            await semaphore.WaitAsync();
            try 
            {
                var response=await httpClient.GetStringAsync(url);
                //Do something with it.
            }
            finally
            {
                semaphore.Release();
            }
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity=5 }
);

//Start the timer right before we start posting 
refreshTimer.Change(1000,1000);
foreach(....)
{

}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
0

This is the snippet:

var tasks = new List<Task>();

foreach(item in listNeedInsert)
{
    var task = TaskToRun(item);
    tasks.Add(task);

    if(tasks.Count == 100)
    {
        await Task.WhenAll(tasks);
        tasks.Clear();
    }
}

// Wait for anything left to finish
await Task.WhenAll(tasks);

Notice that I rather add the task into a List<Task>(); and after all is added, I await all in the same List<Task>();

What you do here:

 var tasks = taskList.Select(async task =>
    {
        do
        {
            sem.WaitOne();
        }
        while (timeList.Count <= maxIntervalCount 
        || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount]);

        await task;

is blocking until the task finishes it's work thus making this call:

Task.WhenAll(tasks).Wait();

completely redundant. Furthermore, this line Task.WhenAll(tasks).Wait(); is performing unnecessary blocking on the WhenAll method.

Barr J
  • 10,636
  • 1
  • 28
  • 46
0

Is the blocking due to some server/firewall/hardware limit or it is based on observation?

You should try to use BlockingCollection<Task> or similar thread safe collections especially if the job of your tasks are I/O-bound. You can even set the capacity to 30:

var collection = BlockingCollection<Task>(30);

Then you can start 2 async method:

var population = Task.Factory.Start(Populate);
var processing = Task.Factory.Start(Dequeue);
await Task.WhenAll(population, processing);

Task Populate()
{
    foreach (...)
        collection.Add(...);
    collection.CompleteAdding();
}
Task Dequeue
{
    while(!collection.IsComplete)
        await collection.Take();                            //consider using TryTake()
}

If the limit presists due to some true limitation (should be very rare) change Populate() as follows:

var stopper = Stopwatch.StartNew();
for (var i = ....)                                          //instead of foreach
{
    if (i % 30 == 0)
    {
        if (stopper.ElapsedMilliseconds < 1000)
            Task.Delay(1000 - stopper.ElapsedMilliseconds); //note that this race condition should be avoided in your code
        stopper.Restart();
    }
    collection.Add(...);
}
collection.CompleteAdding();
Alb
  • 412
  • 5
  • 12
0

I think that this problem can be solved by a SemaphoreSlim limited to the number of maximum tasks per interval, and also by a Task.Delay that delays the release of the SemaphoreSlim after each task's completion, for an interval equal to the required throttling interval. Below is an implementation based on this idea. The rate limiting can be applied in two ways:

  1. With includeAsynchronousDuration: false the rate limit affects how many operations can be started during the specified time span. The duration of each operation is not taken into account.

  2. With includeAsynchronousDuration: true the rate limit affects how many operations can be counted as "active" during the specified time span, and is more restrictive (makes the enumeration slower). Instead of counting each operation as a moment in time (when started), it is counted as a time span (between start and completion). An operation is counted as "active" for a specified time span, if and only if its own time span intersects with the specified time span.

/// <summary>
/// Applies an asynchronous transformation for each element of a sequence,
/// limiting the number of transformations that can start or be active during
/// the specified time span.
/// </summary>
public static async Task<TResult[]> ForEachAsync<TSource, TResult>(
    this IEnumerable<TSource> source,
    Func<TSource, Task<TResult>> action,
    int maxActionsPerTimeUnit,
    TimeSpan timeUnit,
    bool includeAsynchronousDuration = false,
    bool onErrorContinue = false, /* Affects only asynchronous errors */
    bool executeOnCapturedContext = false)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (action == null) throw new ArgumentNullException(nameof(action));
    if (maxActionsPerTimeUnit < 1)
        throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
    if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
        throw new ArgumentOutOfRangeException(nameof(timeUnit));

    using var semaphore = new SemaphoreSlim(maxActionsPerTimeUnit,
        maxActionsPerTimeUnit);
    using var cts = new CancellationTokenSource();
    var tasks = new List<Task<TResult>>();
    var releaseTasks = new List<Task>();

    try // Watch for exceptions thrown by the source enumerator
    {
        foreach (var item in source)
        {
            try
            {
                await semaphore.WaitAsync(cts.Token)
                    .ConfigureAwait(executeOnCapturedContext);
            }
            catch (OperationCanceledException) { break; }

            // Exceptions thrown synchronously by invoking the action are breaking
            // the loop unconditionally (the onErrorContinue has no effect on them).
            var task = action(item);
            if (!onErrorContinue) task = ObserveFailureAsync(task);
            tasks.Add(task);
            releaseTasks.Add(ScheduleSemaphoreReleaseAsync(task));
        }
    }
    catch (Exception ex) { tasks.Add(Task.FromException<TResult>(ex)); }
    cts.Cancel(); // Cancel all release tasks

    Task<TResult[]> whenAll = Task.WhenAll(tasks);
    try { return await whenAll.ConfigureAwait(false); }
    catch (OperationCanceledException) when (whenAll.IsCanceled) { throw; }
    catch { whenAll.Wait(); throw; } // Propagate AggregateException
    finally { await Task.WhenAll(releaseTasks); }

    async Task<TResult> ObserveFailureAsync(Task<TResult> task)
    {
        try { return await task.ConfigureAwait(false); }
        catch { cts.Cancel(); throw; }
    }

    async Task ScheduleSemaphoreReleaseAsync(Task<TResult> task)
    {
        if (includeAsynchronousDuration)
            try { await task.ConfigureAwait(false); } catch { } // Ignore exceptions
        // Release only if the Task.Delay completed successfully
        try { await Task.Delay(timeUnit, cts.Token).ConfigureAwait(false); }
        catch (OperationCanceledException) { return; }
        semaphore.Release();
    }
}

Usage example:

int[] results = await ForEachAsync(Enumerable.Range(1, 100), async n =>
{
    await Task.Delay(500); // Simulate some asynchronous I/O-bound operation
    return n;
}, maxActionsPerTimeUnit: 30, timeUnit: TimeSpan.FromSeconds(1.0),
    includeAsynchronousDuration: true);

The reasons for propagating an AggregateException using the catch+Wait technique, are explained here.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104