12

I need to implement a library to request vk.com API. The problem is that API supports only 3 requests per second. I would like to have API asynchronous.

Important: API should support safe accessing from multiple threads.

My idea is implement some class called throttler which allow no more than 3 request/second and delay other request.

The interface is next:

public interface IThrottler : IDisposable
{
    Task<TResult> Throttle<TResult>(Func<Task<TResult>> task);
}

The usage is like

var audio = await throttler.Throttle(() => api.MyAudio());
var messages = await throttler.Throttle(() => api.ReadMessages());
var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId));
/// Here should be delay because 3 requests executed
var photo = await throttler.Throttle(() => api.MyPhoto());

How to implement throttler?

Currently I implemented it as queue which is processed by background thread.

public Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
{
    /// TaskRequest has method Run() to run task
    /// TaskRequest uses TaskCompletionSource to provide new task 
    /// which is resolved when queue processed til this element.
    var request = new TaskRequest<TResult>(task);

    requestQueue.Enqueue(request);

    return request.ResultTask;
}

This is shorten code of background thread loop which process the queue:

private void ProcessQueue(object state)
{
    while (true)
    {
        IRequest request;
        while (requestQueue.TryDequeue(out request))
        {
            /// Delay method calculates actual delay value and calls Thread.Sleep()
            Delay();
            request.Run();
        }

    }
}

Is it possible to implement this without background thread?

STO
  • 10,390
  • 8
  • 32
  • 32
  • 1
    Why not just create an implementation of the TaskScheduler https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx that watches over tasks to limit them to 3 per second? The framework would take care of the rest. –  Dec 16 '15 at 16:02
  • @Will I thought about this, but don't get how to bind all api calls from all threads to the TaskScheduler? – STO Dec 16 '15 at 16:04
  • You simply keep a reference to the task scheduler where the threads can access it, route the calls through Task.Run, and return Task from your api methods. Done and done, you only had to write one class. –  Dec 16 '15 at 16:05
  • @Will He doesn't have CPU bound work, so he won't be using `Task.Run` at all (nor should he be). – Servy Dec 16 '15 at 16:11
  • @Servy [citation needed] –  Dec 16 '15 at 16:38
  • @Will The question opens on the premise that he's performing network IO. Network IO isn't CPU bound work. And either way, *you're* the one claiming that he only has CPU bound work, by proposing a solution that only works for CPU bound work. If we didn't know if the work was CPU bound or not, then that wouldn't be a valid assumption. In the general case you'd need to be able to handle non-CPU bound work. – Servy Dec 16 '15 at 16:48
  • @Servy *only* works with CPU bound work? Again, [citation needed]. –  Dec 16 '15 at 16:50
  • @Will So how are you intending to use a `TaskScheduler` *which exists to schedule CPU bound tasks*, when you have IO work? Sounds like you need to go spend some time with the `TaskScheduler` Documentation. – Servy Dec 16 '15 at 16:57
  • 1
    @Will unfortunatelly a custom `TaskScheduler` can not be used as a solution to this problem, because the method `Task.Factory.StartNew` that accepts a `scheduler` argument does not understand `async` delegates. [Here](https://stackoverflow.com/questions/15428604/how-to-run-a-task-on-a-custom-taskscheduler-using-await/57702536#57702536) is my research about this topic. – Theodor Zoulias Aug 29 '19 at 03:27

5 Answers5

14

So we'll start out with a solution to a simpler problem, that of creating a queue that process up to N tasks concurrently, rather than throttling to N tasks started per second, and build on that:

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }
    public TaskQueue(int concurrentRequests)
    {
        semaphore = new SemaphoreSlim(concurrentRequests);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

We'll also use the following helper methods to match the result of a TaskCompletionSource to a `Task:

public static void Match<T>(this TaskCompletionSource<T> tcs, Task<T> task)
{
    task.ContinueWith(t =>
    {
        switch (t.Status)
        {
            case TaskStatus.Canceled:
                tcs.SetCanceled();
                break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions);
                break;
            case TaskStatus.RanToCompletion:
                tcs.SetResult(t.Result);
                break;
        }

    });
}

public static void Match<T>(this TaskCompletionSource<T> tcs, Task task)
{
    Match(tcs, task.ContinueWith(t => default(T)));
}

Now for our actual solution what we can do is each time we need to perform a throttled operation we create a TaskCompletionSource, and then go into our TaskQueue and add an item that starts the task, matches the TCS to its result, doesn't await it, and then delays the task queue for 1 second. The task queue will then not allow a task to start until there are no longer N tasks started in the past second, while the result of the operation itself is the same as the create Task:

public class Throttler
{
    private TaskQueue queue;
    public Throttler(int requestsPerSecond)
    {
        queue = new TaskQueue(requestsPerSecond);
    }
    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
    public Task Enqueue<T>(Func<Task> taskGenerator)
    {
        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
}
Servy
  • 202,030
  • 26
  • 332
  • 449
  • Thanks, looks good, but don't `Task.Delay(TimeSpan.FromSeconds(1))` be delayed __in parallel__ with other two task executed? – STO Dec 16 '15 at 16:30
  • @STO Yes. That's the point. It will only be done in parallel with up to two other tasks. It won't allow a fourth, fifth, etc. to even hit the delay until there has no longer been 3 tasks started in the past second. Once the oldest of the three delays has past, the next one comes in, and now there's a new oldest one second delay, letting in another task when *it* finishes, etc. giving you a rolling one second window. – Servy Dec 16 '15 at 16:32
  • Got your point. Accepted as answer. One thing is we don't need always delay for exactly one second, we should delay _the rest of second_ if task completed early than one second. – STO Dec 16 '15 at 16:42
  • @STO The delays starts when the operation is *started*, not when it's *finished*, so there's no need to even *consider* how long the operation actually takes to run. – Servy Dec 16 '15 at 16:46
  • 2
    @STO No, it's important that it *not* be used. That's the whole point. The queue only cares about the 1 second delay after *starting* the given operation. It doesn't care at all about when the operation itself *actually finishes*. That's your premise. So it ensures that you can't start a task as long as N tasks have been started in the past second, and lets the next task start the moment it has been 1 second since the oldest task was *started*. – Servy Dec 16 '15 at 16:53
  • 1
    @Servy You have posted this TaskQueue before, however I seem to be missing something. When I add work it is never executed. Do you have a working example of this code somewhere? – Arnold Wiersma Sep 08 '16 at 11:51
  • @ArnoldWiersma It's the responsibility of the function the queue accepts to start the work that will end up completing the task. There's no way for the queue to know anything about that work and to start any of it. As far as an example, see the second half of the answer for an example. – Servy Sep 08 '16 at 13:13
4

I solved a similar problem using a wrapper around SemaphoreSlim. In my scenario, I had some other throttling mechanisms as well, and I needed to make sure that requests didn't hit the external API too often even if request number 1 took longer to reach the API than request number 3. My solution was to use a wrapper around SemaphoreSlim that had to be released by the caller, but the actual SemaphoreSlim would not be released until a set time had passed.

public class TimeGatedSemaphore
{
    private readonly SemaphoreSlim semaphore;
    public TimeGatedSemaphore(int maxRequest, TimeSpan minimumHoldTime)
    {
        semaphore = new SemaphoreSlim(maxRequest);
        MinimumHoldTime = minimumHoldTime;
    }
    public TimeSpan MinimumHoldTime { get; }

    public async Task<IDisposable> WaitAsync()
    {
        await semaphore.WaitAsync();
        return new InternalReleaser(semaphore, Task.Delay(MinimumHoldTime));
    }

    private class InternalReleaser : IDisposable
    {
        private readonly SemaphoreSlim semaphoreToRelease;
        private readonly Task notBeforeTask;
        public InternalReleaser(SemaphoreSlim semaphoreSlim, Task dependantTask)
        {
            semaphoreToRelease = semaphoreSlim;
            notBeforeTask = dependantTask;
        }
        public void Dispose()
        {
            notBeforeTask.ContinueWith(_ => semaphoreToRelease.Release());
        }
    }
}

Example usage:

private TimeGatedSemaphore requestThrottler = new TimeGatedSemaphore(3, TimeSpan.FromSeconds(1));
public async Task<T> MyRequestSenderHelper(string endpoint)
{
    using (await requestThrottler.WaitAsync())
        return await SendRequestToAPI(endpoint);        
}
Joakim M. H.
  • 424
  • 4
  • 14
0

Here is one solution that uses a Stopwatch:

public class Throttler : IThrottler
{
    private readonly Stopwatch m_Stopwatch;
    private int m_NumberOfRequestsInLastSecond;
    private readonly int m_MaxNumberOfRequestsPerSecond;

    public Throttler(int max_number_of_requests_per_second)
    {
        m_MaxNumberOfRequestsPerSecond = max_number_of_requests_per_second;
        m_Stopwatch = Stopwatch.StartNew();
    }


    public async Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
    {
        var elapsed = m_Stopwatch.Elapsed;

        if (elapsed > TimeSpan.FromSeconds(1))
        {
            m_NumberOfRequestsInLastSecond = 1;

            m_Stopwatch.Restart();

            return await task();
        }

        if (m_NumberOfRequestsInLastSecond >= m_MaxNumberOfRequestsPerSecond)
        {
            TimeSpan time_to_wait = TimeSpan.FromSeconds(1) - elapsed;

            await Task.Delay(time_to_wait);

            m_NumberOfRequestsInLastSecond = 1;

            m_Stopwatch.Restart();

            return await task();
        }

        m_NumberOfRequestsInLastSecond++;

        return await task();
    }
}

Here is how this code can be tested:

class Program
{
    static void Main(string[] args)
    {
        DoIt();

        Console.ReadLine();
    }

    static async Task DoIt()
    {
        Func<Task<int>> func = async () =>
        {
            await Task.Delay(100);
            return 1;
        };

        Throttler throttler = new Throttler(3);

        for (int i = 0; i < 10; i++)
        {
            var result = await throttler.Throttle(func);

            Console.WriteLine(DateTime.Now);
        }            
    }
}
Yacoub Massad
  • 27,509
  • 2
  • 36
  • 62
  • 1
    Sorry, forgot to mention about multi-threading in question. Edited question. – STO Dec 16 '15 at 15:40
  • @Servy, I can't understand your point. How is it wasting a half or a full second? Can you please elaborate? – Yacoub Massad Dec 16 '15 at 15:44
  • Note that if someone tries to start 10 threads at the same time this code will execute three right away, and then all 7 remaining items 1 second later, which isn't correct. – Servy Dec 16 '15 at 15:49
  • 1
    @Servy, When I answered the question, there wasn't a requirement for multiple threads accessing the object. However, if you invoke `Throttle` 10 times (from a single thread), the first three will run the in first second, the second three will run the second second... I actually tested this before posting the answer. – Yacoub Massad Dec 16 '15 at 15:52
  • @YacoubMassad When going through the code, the first three tasks won't go into either `if`, will start running, and then the next seven will all go into the `if`, be delayed *one second* and then they'll all run, at the same time 7 seconds later. There is no code to delay 4 of the second any longer than that. – Servy Dec 16 '15 at 16:09
  • @Servy, the 4th task will go into the second `if` statement and will restart the stopwatch. This means that the 5th and the 6th tasks will not go into any `if` statement. You could test this code easily. – Yacoub Massad Dec 16 '15 at 16:29
  • @YacoubMassad The 4th item won't reset the stopwatch or the count until it's been a second. The 5th and 6th tasks will (or at least can) be started before that happens, meaning they *all* go into the second `if`, meaning they *all* reset the stopwatch and the count, which is clearly badness. – Servy Dec 16 '15 at 16:35
  • @Servy, the 4th task will go into the second `if` statement not the first `if` statement. Please note that my discussion here is about a single thread. – Yacoub Massad Dec 16 '15 at 16:37
  • @YacoubMassad Yes, that's what I said in my comment. The reason your test appears to work is that *you're not actually queuing all 10 tasks at once*. You're awaiting the throttle operations, meaning that you don't try to start the second operation until the first finishes, you don't try to star the third until the fourth finishes, etc. If you just call `Throttle` 10 times your code *completely* breaks **even in a single threaded context**, and you end up executing three operation in the first second and the 7 in the next.. – Servy Dec 16 '15 at 16:44
  • @Servy, I see your point. However, my assumption was that OP didn't want to run the tasks in parallel, i.e., I assumed that he wants to wait for a request to finish before sending a new one. I assumed this because of the "The usage is like" section of his question that showed a usage that matches with my assumption. – Yacoub Massad Dec 16 '15 at 16:54
  • And yet the edit to indicate that the throttler will potentially even have items enqueued from multiple threads at the same time makes it rather clear that that assumption is false. – Servy Dec 16 '15 at 17:03
  • @Servy, I agree. I started to write my answer before OP makes the edit. – Yacoub Massad Dec 16 '15 at 17:12
  • The edit just makes it clear that your assumption *was wrong all along*. It's not like the original revision of the question clearly stated a premise and then changed it, it simply clarified an unspecified point. – Servy Dec 16 '15 at 17:13
  • @Servy, I simply disagree. Before the edit, the "The usage is like" section is clearly implying a specific way of using the method. – Yacoub Massad Dec 16 '15 at 17:33
  • @YacoubMassad It was *a possibility*, but not *a requirement*. You made the assumption that the one example usage given was the *only possible valid usage*. That's not a valid thing to do. The example shows one way of using it, but that in no way means it's the *only* way it's allowed to be used. – Servy Dec 16 '15 at 17:37
0

You can use this as Generic

public TaskThrottle(int maxTasksToRunInParallel)
{
    _semaphore = new SemaphoreSlim(maxTasksToRunInParallel);
}

public void TaskThrottler<T>(IEnumerable<Task<T>> tasks, int timeoutInMilliseconds, CancellationToken cancellationToken = default(CancellationToken)) where T : class
{
    // Get Tasks as List
    var taskList = tasks as IList<Task<T>> ?? tasks.ToList();
    var postTasks = new List<Task<int>>();

    // When the first task completed, it will flag 
    taskList.ForEach(x =>
    {
        postTasks.Add(x.ContinueWith(y => _semaphore.Release(), cancellationToken));
    });

    taskList.ForEach(x =>
    {
        // Wait for open slot 
        _semaphore.Wait(timeoutInMilliseconds, cancellationToken);
        cancellationToken.ThrowIfCancellationRequested();
        x.Start();
    });

    Task.WaitAll(taskList.ToArray(), cancellationToken);
}
Evan Frisch
  • 1,334
  • 5
  • 22
  • 40
Manni Dula
  • 49
  • 4
-2

Edit: this solution works but use it only if it is ok to process all request in serial (in one thread). Otherwise use solution accepted as answer.

Well, thanks to Best way in .NET to manage queue of tasks on a separate (single) thread

My question is almost duplicate except adding delay before execution, which is actually simple.

The main helper here is SemaphoreSlim class which allows to restrict degree of parallelism.

So, first create a semaphore:

// Semaphore allows run 1 thread concurrently.
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

And, final version of throttle looks like

public async Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
{
    await semaphore.WaitAsync();
    try
    {
        await delaySource.Delay();
        return await task();
    }
    finally
    {
        semaphore.Release();
    }
}

Delay source is also pretty simple:

private class TaskDelaySource
{
    private readonly int maxTasks;
    private readonly TimeSpan inInterval;
    private readonly Queue<long> ticks = new Queue<long>();

    public TaskDelaySource(int maxTasks, TimeSpan inInterval)
    {
        this.maxTasks = maxTasks;
        this.inInterval = inInterval;
    }

    public async Task Delay()
    {
        // We will measure time of last maxTasks tasks.
        while (ticks.Count > maxTasks)
            ticks.Dequeue();

        if (ticks.Any())
        {
            var now = DateTime.UtcNow.Ticks;
            var lastTick = ticks.First();
            // Calculate interval between last maxTasks task and current time
            var intervalSinceLastTask = TimeSpan.FromTicks(now - lastTick);

            if (intervalSinceLastTask < inInterval)
                await Task.Delay((int)(inInterval - intervalSinceLastTask).TotalMilliseconds);
        }

        ticks.Enqueue(DateTime.UtcNow.Ticks);
    }
}
Community
  • 1
  • 1
STO
  • 10,390
  • 8
  • 32
  • 32
  • Your `Delay` method cannot be safely called from multiple threads. – Servy Dec 16 '15 at 16:07
  • @Servy true but it is always guarded with semaphore with 1 max degree of parallelism, itsn't it? – STO Dec 16 '15 at 16:08
  • 1
    If you do that then you can never have more than one concurrent operation, as the second throttled operation can't start until the first finishes. – Servy Dec 16 '15 at 16:10