1

I'm using Google Analytics and that service has limit of 10 concurrent requests. I had to limit my API somehow, so I decided to use a semaphore, but it seems it doesn't work. All requests are triggered simultaneously. I can't find the problem in my code.

public async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
{
    var todayVisits = _googleAnalyticsService.GetTodayVisitsNumber();
    var todayTraffic = _googleAnalyticsService.GetTodayTraffic();
    var newAndReturningUsers = _googleAnalyticsService.GetNewAndReturningUsersNumber();
    var averageSessionDuration = _googleAnalyticsService.GetAverageSessionDuration();
    var deviceCategory = _googleAnalyticsService.GetSessionNumberByDeviceCategory();
    var topPages = _googleAnalyticsService.GetTodaysTopPages();
    var guestsAndRegisteredUsers = _googleAnalyticsService.GetGuestsVsRegisteredUsers();
    var averageNumberOfSessionsPerDay = _googleAnalyticsService.GetAverageSessionsNumber();
    var visitsPerWeekday = _googleAnalyticsService.GetTrafficByWeekday();
    var visitsByHours = _googleAnalyticsService.GetTrafficByTimeOfDay();
    var usersByPrefectures = _googleAnalyticsService.GetUsersByPrefectures();
    var usersByCountry = _googleAnalyticsService.GetUsersByCountry();

    var tasks = new List<Task>()
        {
            todayVisits, todayTraffic, newAndReturningUsers,
            averageSessionDuration, deviceCategory, topPages,
            guestsAndRegisteredUsers, averageNumberOfSessionsPerDay, visitsPerWeekday,
            visitsByHours, usersByPrefectures, usersByCountry
        };

    var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);

    foreach(var task in tasks)
    {
        await throttler.WaitAsync();

        try
        {
            await task;
            await Task.Delay(1000); // It's important due to limits of Google Analytics requests (10 queries per second per IP address)
        }
        finally
        {
            throttler.Release();
        }
    }

    await Task.WhenAll(tasks);

    return new SiteAnalyticsDTO()
        {
            TodayVisits = await todayVisits,
            TodayTraffic = await todayTraffic,
            NewAndReturningUsers = await newAndReturningUsers,
            AverageSessionDuration = await averageSessionDuration,
            DeviceCategory = await deviceCategory,
            TopPages = await topPages,
            GuestsAndRegisteredUsers = await guestsAndRegisteredUsers,
            AverageNumberOfSessionsPerDay = await averageNumberOfSessionsPerDay,
            VisitsPerWeekday = await visitsPerWeekday,
            VisitsByHours = await visitsByHours,
            UsersByPrefectures = await usersByPrefectures,
            UsersByCountry = await usersByCountry
        };
}

And here is some example method for Google Analytics call:

  public async Task<int> GetTodayVisitsNumber(List<long> listingIds = null)
    {
        string filter = GetFilter(listingIds);

        var getReportsRequest = GetReportsRequestModel(GetTodayDateRange(), "ga:sessionCount", "ga:sessions", _configuration.MainViewId, filter);
        var response = await _service.Reports.BatchGet(getReportsRequest).ExecuteAsync();
        Console.WriteLine(response);
        var data = response.Reports.FirstOrDefault();

        return Convert.ToInt32(data?.Data.Totals[0].Values[0]);
    }
marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
DiPix
  • 5,755
  • 15
  • 61
  • 108

2 Answers2

3

All requests are triggered simultaneously.

Let's take a look here

var todayVisits = _googleAnalyticsService.GetTodayVisitsNumber();
var todayTraffic = _googleAnalyticsService.GetTodayTraffic();
var newAndReturningUsers = _googleAnalyticsService.GetNewAndReturningUsersNumber();
var averageSessionDuration = _googleAnalyticsService.GetAverageSessionDuration();
var deviceCategory = _googleAnalyticsService.GetSessionNumberByDeviceCategory();
var topPages = _googleAnalyticsService.GetTodaysTopPages();
var guestsAndRegisteredUsers = _googleAnalyticsService.GetGuestsVsRegisteredUsers();
var averageNumberOfSessionsPerDay = _googleAnalyticsService.GetAverageSessionsNumber();
var visitsPerWeekday = _googleAnalyticsService.GetTrafficByWeekday();
var visitsByHours = _googleAnalyticsService.GetTrafficByTimeOfDay();
var usersByPrefectures = _googleAnalyticsService.GetUsersByPrefectures();
var usersByCountry = _googleAnalyticsService.GetUsersByCountry();

You are storing the result of each of these methods. When you use the parenthesis tokens such as "methodName();" you invoke the method and store the result in var.

You then store the results of these methods in a list, then await each of them with a Semaphore to limit the number of tasks that can be awaited at once.

The issue is: each await finishes immediately because you already waited(synchronously) them when you initially invoked them above.

This leads you to believe that the SemaphoreSlim isn't working, because if each Task immediately returns when awaited(because they've already been invoked) then there is no time between them.

Store the async methods for later instead of invoking them all at once.
You can't store a delegate like those in a var you have to store them in an explicitly typed variable Func<TResult>.

For example:

Func<Task<object>> todayVisits = _googleAnalyticsService.GetTodayVisitsNumber;

editors note, I have no clue what these methods return I substituted object to be as general as possible

Now - it would be pretty cumbersome if we stored each one in a variable, so instead of storing them in individual variables let's just put them directly in a list like this:

var awaitableTasks = new List<Func<Task<object>>>()
{
    _googleAnalyticsService.GetTodayVisitsNumber,
    _googleAnalyticsService.GetTodayTraffic,
    _googleAnalyticsService.GetNewAndReturningUsersNumber,
    _googleAnalyticsService.GetAverageSessionDuration,
    _googleAnalyticsService.GetSessionNumberByDeviceCategory,
    _googleAnalyticsService.GetTodaysTopPages,
    _googleAnalyticsService.GetGuestsVsRegisteredUsers,
    _googleAnalyticsService.GetAverageSessionsNumber,
    _googleAnalyticsService.GetTrafficByWeekday,
    _googleAnalyticsService.GetTrafficByTimeOfDay,
    _googleAnalyticsService.GetUsersByPrefectures,
    _googleAnalyticsService.GetUsersByCountry
};

Because these new objects themselves aren't tasks but rather methods that return a Task we have to change how we store and invoke them, for this we'll use a local method so I'll go over each of the changes I made.

Let's create that Semaphore and create somewhere we can put tasks to keep track of them.

Let's also create somewhere we can store the results of each of those tasks when we await them.

var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);

var tasks = new List<Task>();

ConcurrentDictionary<string, object> results = new();

Let's create a local method with a couple of responsibilities

  1. Accept a Func<Task<object>> as a param
  2. Await the method
  3. Put the result of that method somewhere we can get it later
  4. Release the Semphore even if it encounters an error
async Task Worker(Func<Task<object>> awaitableFunc)
{
    try
    {
        resultDict.TryAdd(awaitableFunc.GetMethodInfo().Name, await awaitableFunc());
    }
    finally
    {
        throttler.Release();
    }
}

Editors note: You can pull the same thing off with a lambda expression, but I prefer to use a local method for clarity and formatting.

Start the workers and store the tasks they return.

That way.. if they aren't done by the time the last couple are created, then we can wait for them to finish before creating the final object(since we are going to need all of the results they provide to create the final object).

foreach (var task in awaitableTasks)
{
    await throttler.WaitAsync();
    tasks.Add(Task.Run(() => Worker(task)));
}

// wait for the tasks to finish
await Task.WhenAll(tasks);

Create the final object then return it.

return new SiteAnalyticsDTO()
{
    TodayVisits = resultDict[nameof(_googleAnalyticsService.GetTodayVisitsNumber)],
    TodayTraffic = resultDict[nameof(_googleAnalyticsService.GetTodayTraffic)],
    NewAndReturningUsers = resultDict[nameof(_googleAnalyticsService.GetNewAndReturningUsersNumber)],
    AverageSessionDuration = resultDict[nameof(_googleAnalyticsService.GetAverageSessionDuration)],
    DeviceCategory = resultDict[nameof(_googleAnalyticsService.GetSessionNumberByDeviceCategory)],
    TopPages = resultDict[nameof(_googleAnalyticsService.GetTodaysTopPages)],
    GuestsAndRegisteredUsers = resultDict[nameof(_googleAnalyticsService.GetGuestsVsRegisteredUsers)],
    AverageNumberOfSessionsPerDay = resultDict[nameof(_googleAnalyticsService.GetAverageSessionsNumber)],
    VisitsPerWeekday = resultDict[nameof(_googleAnalyticsService.GetTrafficByWeekday)],
    VisitsByHours = resultDict[nameof(_googleAnalyticsService.GetTrafficByTimeOfDay)],
    UsersByPrefectures = resultDict[nameof(_googleAnalyticsService.GetUsersByPrefectures)],
    UsersByCountry = resultDict[nameof(_googleAnalyticsService.GetUsersByCountry)]
};

Wrap it all together and I think we have something that might work, or at least be easy to modify to meet your needs.

public static async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
{
    // store these methods so we can iterate and execute them later
    var awaitableTasks = new List<Func<Task<object>>>()
    {
        _googleAnalyticsService.GetTodayVisitsNumber,
        _googleAnalyticsService.GetTodayTraffic,
        _googleAnalyticsService.GetNewAndReturningUsersNumber,
        _googleAnalyticsService.GetAverageSessionDuration,
        _googleAnalyticsService.GetSessionNumberByDeviceCategory,
        _googleAnalyticsService.GetTodaysTopPages,
        _googleAnalyticsService.GetGuestsVsRegisteredUsers,
        _googleAnalyticsService.GetAverageSessionsNumber,
        _googleAnalyticsService.GetTrafficByWeekday,
        _googleAnalyticsService.GetTrafficByTimeOfDay,
        _googleAnalyticsService.GetUsersByPrefectures,
        _googleAnalyticsService.GetUsersByCountry
    };

    // create a way to limit the number of concurrent requests
    var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);

    // create a place to store the tasks we create
    var finalTasks = new List<Task>();

    // make sure we have some where to put our results
    ConcurrentDictionary<string, object> resultDict = new();

    // make a worker that accepts one of those methods, invokes it
    // then adds the result to the dict
    async Task Worker(Func<Task<object>> awaitableFunc)
    {
        try
        {
            resultDict.TryAdd(awaitableFunc.GetMethodInfo().Name, await awaitableFunc());
        }
        finally
        {
            // make sure even if we encounter an error we still release the semphore
            throttler.Release();
        }
    }

    // iterate over the tasks, wait for the sempahore
    // when we get a slot, create a worker and send it to the background
    foreach (var task in awaitableTasks)
    {
        await throttler.WaitAsync();
        finalTasks.Add(Task.Run(() => Worker(task)));
    }

    // wait for any remaining tasks to finish up in the background if they are still running
    await Task.WhenAll(finalTasks);

    // create the return object from the results of the dictionary
    return new SiteAnalyticsDTO()
    {
        TodayVisits = resultDict[nameof(_googleAnalyticsService.GetTodayVisitsNumber)],
        TodayTraffic = resultDict[nameof(_googleAnalyticsService.GetTodayTraffic)],
        NewAndReturningUsers = resultDict[nameof(_googleAnalyticsService.GetNewAndReturningUsersNumber)],
        AverageSessionDuration = resultDict[nameof(_googleAnalyticsService.GetAverageSessionDuration)],
        DeviceCategory = resultDict[nameof(_googleAnalyticsService.GetSessionNumberByDeviceCategory)],
        TopPages = resultDict[nameof(_googleAnalyticsService.GetTodaysTopPages)],
        GuestsAndRegisteredUsers = resultDict[nameof(_googleAnalyticsService.GetGuestsVsRegisteredUsers)],
        AverageNumberOfSessionsPerDay = resultDict[nameof(_googleAnalyticsService.GetAverageSessionsNumber)],
        VisitsPerWeekday = resultDict[nameof(_googleAnalyticsService.GetTrafficByWeekday)],
        VisitsByHours = resultDict[nameof(_googleAnalyticsService.GetTrafficByTimeOfDay)],
        UsersByPrefectures = resultDict[nameof(_googleAnalyticsService.GetUsersByPrefectures)],
        UsersByCountry = resultDict[nameof(_googleAnalyticsService.GetUsersByCountry)]
    };
}
DekuDesu
  • 2,224
  • 1
  • 5
  • 19
  • Interesting, not gonna lie it was my suspicious - that I'm awaiting it too early. But why in my code ConsoleLog is triggered in `forEach` instead after initialization? It's true that in first loop - all of them, but still, why there? – DiPix Jun 08 '21 at 07:03
2

The problem of your setup is that all tasks are started at the same time, and only their awaiting is throttled. Throttling the awaiting has no useful effect. Only your continuations are delayed. The target service receives all the requests in bulk.

My suggestion is to use a dedicated class to encapsulate the throttling logic. It seems that you need to limit both the concurrency and the rate of sending the requests, and each one of these limitations can be achieved by using a separate SemaphoreSlim. Here is a simple implementation:

public class ThrottledExecution
{
    private readonly SemaphoreSlim _concurrencySemaphore;
    private readonly SemaphoreSlim _delaySemaphore;
    private readonly TimeSpan _delay;

    public ThrottledExecution(int concurrencyLimit, TimeSpan rateLimitTime,
        int rateLimitCount)
    {
        // Arguments validation omitted
        _concurrencySemaphore = new SemaphoreSlim(concurrencyLimit, concurrencyLimit);
        _delaySemaphore = new SemaphoreSlim(rateLimitCount, rateLimitCount);
        _delay = rateLimitTime;
    }

    public async Task<TResult> Run<TResult>(Func<Task<TResult>> action)
    {
        await _delaySemaphore.WaitAsync();
        ScheduleDelaySemaphoreRelease();
        await _concurrencySemaphore.WaitAsync();
        try { return await action().ConfigureAwait(false); }
        finally { _concurrencySemaphore.Release(); }
    }

    private async void ScheduleDelaySemaphoreRelease()
    {
        await Task.Delay(_delay).ConfigureAwait(false);
        _delaySemaphore.Release();
    }
}

And here is how you could use it:

public async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
{
    var throttler = new ThrottledExecution(MaxRequests, TimeSpan.FromSeconds(1), 1);

    var todayVisits = throttler.Run(() => _service.GetTodayVisitsNumber());
    var todayTraffic = throttler.Run(() => _service.GetTodayTraffic());
    var newAndReturningUsers = throttler.Run(() => _service.GetNewAndReturningUsersNumber());
    var averageSessionDuration = throttler.Run(() => _service.GetAverageSessionDuration());
    var deviceCategory = throttler.Run(() => _service.GetSessionNumberByDeviceCategory());
    var topPages = throttler.Run(() => _service.GetTodaysTopPages());
    var guestsAndRegisteredUsers = throttler.Run(() => _service.GetGuestsVsRegisteredUsers());
    var averageNumberOfSessionsPerDay = throttler.Run(() => _service.GetAverageSessionsNumber());
    var visitsPerWeekday = throttler.Run(() => _service.GetTrafficByWeekday());
    var visitsByHours = throttler.Run(() => _service.GetTrafficByTimeOfDay());
    var usersByPrefectures = throttler.Run(() => _service.GetUsersByPrefectures());
    var usersByCountry = throttler.Run(() => _service.GetUsersByCountry());

    var tasks = new List<Task>()
    {
        todayVisits, todayTraffic, newAndReturningUsers,
        averageSessionDuration, deviceCategory, topPages,
        guestsAndRegisteredUsers, averageNumberOfSessionsPerDay, visitsPerWeekday,
        visitsByHours, usersByPrefectures, usersByCountry
    };
    await Task.WhenAll(tasks);

    return new SiteAnalyticsDTO()
    {
        TodayVisits = await todayVisits,
        TodayTraffic = await todayTraffic,
        NewAndReturningUsers = await newAndReturningUsers,
        AverageSessionDuration = await averageSessionDuration,
        DeviceCategory = await deviceCategory,
        TopPages = await topPages,
        GuestsAndRegisteredUsers = await guestsAndRegisteredUsers,
        AverageNumberOfSessionsPerDay = await averageNumberOfSessionsPerDay,
        VisitsPerWeekday = await visitsPerWeekday,
        VisitsByHours = await visitsByHours,
        UsersByPrefectures = await usersByPrefectures,
        UsersByCountry = await usersByCountry,
    };
}

It seems that a partially successful result is not useful to you, so you could consider adding some auto-cancellation logic inside the ThrottledExecution class. In case a task fails, all pending and subsequent asynchronous operations should be canceled.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • About the `ThrottledExecution(int concurrencyLimit, TimeSpan rateLimitTime, int rateLimitCount)` shouldn't `rateLimitCount` have the same value as `concurrencyLimit` so 10 instead 1 in my case (`MaxRequests`)? – DiPix Jun 08 '21 at 09:18
  • @DiPix no, the concurrency limit and the rate limit are independent from each other. The concurrency limit denotes how many operations can be concurrently *in-flight*, so the duration of the operations matters. The rate limit denotes how many operations can be *started* in any (sliding) time window, so the duration of the operations is irrelevant. If you want to limit only the concurrency and not the rate, you can pass these values: `TimeSpan.Zero`, `Int32.MaxValue`. – Theodor Zoulias Jun 08 '21 at 09:33
  • I see. Because currently with your example that operation tooks even 12 sec. Instead just 2 sec. – DiPix Jun 08 '21 at 22:32
  • @DiPix yeap, in the example I've configured the `ThrottledExecution` with a rate of one operation per second, which is probably incorrect. Based on a code comment in your question it should be ten operations per second, so the `rateLimitCount` should be `10` instead of `1`. – Theodor Zoulias Jun 08 '21 at 22:40
  • @DiPix it is possible that you don't actually need to limit the concurrency, and all you need is to limit the rate. In that case I have posted a simpler `RateLimiter` class [here](https://stackoverflow.com/questions/65825673/partition-how-to-add-a-wait-after-every-partition/65829971#65829971). Or just use the `ThrottledExecution` class configured with a `concurrencyLimit` equal to `Int32.MaxValue`. – Theodor Zoulias Jun 08 '21 at 22:46
  • Thank you for the explanation. It works fine :) I wanna ask you for one more thing. https://stackoverflow.com/questions/65825673/partition-how-to-add-a-wait-after-every-partition/65829971#65829971 This code looks very useful, but I have problem with using this in my example, can you edit your post with that implementation too? – DiPix Jun 09 '21 at 19:10
  • 1
    @DiPix I could add a `Run` method to the `RateLimiter`, but I would prefer to keep it a lightweight component, similar to the `SemaphoreSlim`. [Here](https://gist.github.com/theodorzoulias/5499b77e32b6dc8d5f8fb7b231a64fe4) is the `Run` method, if you want to use it. – Theodor Zoulias Jun 09 '21 at 21:49
  • In retrospect the `ThrottledExecution` class is a bit leaky, because it starts some internal asynchronous operations (the `ScheduleDelaySemaphoreRelease` method) that remain active for some time after the last use of the `ThrottledExecution`. It's not a huge problem, but it can be an issue if you create multiple `ThrottledExecution` instances, initialized with a long `rateLimitTime` time span. A solution to this problem could be to make the class disposable, similarly with the `RateLimiter` class in [this](https://stackoverflow.com/a/65829971/11178549) answer. – Theodor Zoulias Aug 13 '21 at 00:55