1

I have already looked here. But still can't get my head around it. Here is how I am currently accomplishing this:

urls_without_rate_limit = 
    [
       'http://httpbin.org/get'
       'http://httpbin.org/get',
       'http://httpbin.org/get',
       'http://httpbin.org/get',
       'http://httpbin.org/get'
    ]

urls_with_rate_limit = 
    [
       'http://eu.httpbin.org/get'
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get'
    ]

api_rate = 2
api_limit = 6

loop = asyncio.get_event_loop()
    loop.run_until_complete(
        process(urls=urls_without_rate_limit, rate=0, limit=len(url_list)))

    loop.run_until_complete(
        process(urls=urls_with_rate_limit, rate=api_rate, limit=api_limit))
async def process(urls, rate, limit):
    limit = asyncio.Semaphore(limit)

    f = Fetch(
        rate=rate,
        limit=limit
    )

    tasks = []
    for url in urls:
        tasks.append(f.make_request(url=url))

    results = await asyncio.gather(*tasks)

As you can see it will finish the first round of process then start the second round for rate limits.

It works fine but is there a way I could start both rounds at the same time with different rate limits?

tvm

Shery
  • 1,808
  • 5
  • 27
  • 51
  • Have you tried gathering your calls to `process`? – dirn May 06 '20 at 12:36
  • any code example please? – Shery May 06 '20 at 12:58
  • 1
    `loop.run_until_complete(asyncio.gather(process(...), process(...)))` – dirn May 07 '20 at 00:52
  • I don't get it, why scheduling all of them together then wait for all requests to finish is not ok for you? You just need to have one semaphore per API not by pool of requests. Using [this solution](https://stackoverflow.com/a/48682456/1720199) should allow you to solve your problem. It seems they even added this directly in aiohttp as [this suggests](https://stackoverflow.com/a/43857526/1720199). – cglacet May 08 '20 at 14:06
  • I have missed it the first time, but there's an [implementation of what you asked](https://stackoverflow.com/a/49717630/1720199) right in the link you provided, that may not be the shortest possible answer, but that looks pretty good to me. – cglacet May 08 '20 at 14:27
  • Did you managed to make it work? – cglacet May 08 '20 at 15:22

1 Answers1

2

I'll elaborate on what I commented. So you can try to work on you own solution (even though I'll give the complete code here).

You can have a dictionary defining some rules (api -> rate limit per second):

APIS_RATE_LIMIT_PER_S = {
  "http://api.mathjs.org/v4?precision=5": 1,
  "http://api.mathjs.org/v4?precision=2": 3,
}

Which you can then use to decide which semaphore to pick according to the request URL (in practice you would have to do some parsing to get the endpoints you want to control). Once you have that it's just a matter of using the semaphore to make sure you limit the number of simultaneous process executing your request. The last piece to the puzzle is obviously to add a delay before releasing the semaphore.

I'll get for a different version of what is suggested here, but it's basically the same solution. I just made it so you can modify the session object so each call to session.get will automatically apply rate limit control.

def set_rate_limits(session, apis_rate_limits_per_s):
    semaphores = {api: asyncio.Semaphore(s) for api, s in apis_rate_limits_per_s.items()}

    @asynccontextmanager
    async def limit_rate(url):
        await semaphores[url].acquire() 
        start = time.time()
        try:
            yield semaphores[url]
        finally:
            duration = time.time() - start
            await asyncio.sleep(1 - duration)
            semaphores[url].release()

    def add_limit_rate(coroutine):

        async def coroutine_with_rate_limit(url, *args, **kwargs):
            async with limit_rate(url):
                return await coroutine(url, *args, **kwargs)

        return coroutine_with_rate_limit

    session.get = add_limit_rate(session.get)
    session.post = add_limit_rate(session.post)
    return session

Notice that using add_limit_rate you could add rate limit control to any coroutine that has an API endpoint as first argument. But here we will just modify session.get and session.post.

In the end you could use the set_rate_limits function like so:

async def main():
    apis = APIS_RATE_LIMIT_PER_S.keys()
    params = [
        {"expr" : "2^2"},
        {"expr" : "1/0.999"},
        {"expr" : "1/1.001"},
        {"expr" : "1*1.001"},
    ]
    async with aiohttp.ClientSession() as session:
        session = set_rate_limits(session, APIS_RATE_LIMIT_PER_S)
        api_requests = [get_text_result(session, url, params=p) for url, p  in product(apis, params)]
        text_responses = await asyncio.gather(*api_requests)
        print(text_responses)


async def get_text_result(session, url, params=None):
    result = await session.get(url, params=params)
    return await result.text()

If you run this code you wont see much of what is happening, you could add some print here and there in set_rate_limits to "make sure" the rate limit is correctly enforced:

import time

# [...] change this part : 
    def add_limit_rate(coroutine):

        async def coroutine_with_rate_limit(url, *args, **kwargs):
            async with limit_rate(url):
                ######### debug 
                global request_count
                request_count += 1
                this_req_id = request_count
                rate_lim = APIS_RATE_LIMIT_PER_S[url]
                print(f"request #{this_req_id} -> \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
                ########
                r = await coroutine(url, *args, **kwargs)

            ######### debug 
            print(f"request #{this_req_id} <- \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
            ######### 
            return r

If you run this example asyncio.run(main()), you should get something like:

request #1 ->        1ms     rate 1/s
request #2 ->        2ms     rate 3/s
request #3 ->        3ms     rate 3/s
request #4 ->        3ms     rate 3/s
request #1 <-     1003ms     rate 1/s
request #2 <-     1004ms     rate 3/s
request #3 <-     1004ms     rate 3/s
request #5 ->     1004ms     rate 1/s
request #6 ->     1005ms     rate 3/s
request #4 <-     1006ms     rate 3/s
request #5 <-     2007ms     rate 1/s
request #6 <-     2007ms     rate 3/s
request #7 ->     2007ms     rate 1/s
request #7 <-     3008ms     rate 1/s
request #8 ->     3008ms     rate 1/s
request #8 <-     4010ms     rate 1/s

It seems rate limit is respected here, in particular we can have a look at the API with a rate limit of 1 request per second:

request #1 ->        1ms     rate 1/s
request #1 <-     1003ms     rate 1/s
request #5 ->     1004ms     rate 1/s
request #5 <-     2007ms     rate 1/s
request #7 ->     2007ms     rate 1/s
request #7 <-     3008ms     rate 1/s
request #8 ->     3008ms     rate 1/s
request #8 <-     4010ms     rate 1/s

On the other hand, this solution is not very satisfying as we artificially add a 1s ping to all our requests. This is because of this part of the code:

await asyncio.sleep(1 - duration)
semaphores[url].release()

The problem here is that we are waiting for the sleep to finish before giving out control back to the event loop (scheduling another task, another request). That can easily be solved using this piece of code instead:

asyncio.create_task(release_after_delay(semaphores[url], 1 - duration))    

With release_after_delay simply being:

async def release_after_delay(semaphore, delay):
    await asyncio.sleep(delay)
    semaphore.release()

The asyncio.create_task function makes the coroutine "run this in the background". Which means in this code that the semaphore will be released later, but that we don't need to wait for it to give control back to the even loop (which means some other request can be scheduled and also that we can get the result in add_limit_rate). In other words, we don't care about the result of this coroutine, we just want it to run at some point in the future (which is probably why this function used to be call ensure_future).

Using this patch, we have the following for the API with rate limit set to one request per second:

request #1 ->        1ms     rate 1/s
request #1 <-      214ms     rate 1/s
request #2 ->     1002ms     rate 1/s
request #2 <-     1039ms     rate 1/s
request #3 ->     2004ms     rate 1/s
request #3 <-     2050ms     rate 1/s
request #4 ->     3009ms     rate 1/s
request #4 <-     3048ms     rate 1/s

It's definitively closer to what we would expect this code to do. We get each response from our API as soon as we can (in this example the ping is 200ms/37ms/46ms/41ms). And the rate limit is respected too.

This is probably not the most beautiful code, but it can be a start for you to work with. Maybe make a clean package with that once you have it working nicely, I guess that's something other people may like to use.

cglacet
  • 8,873
  • 4
  • 45
  • 60
  • this is brilliant response. I will have a crack at this and let you know... but for now I will accept it as a valid answer – Shery May 12 '20 at 12:08
  • Thanks, hope it helps. On the other hand, now that I think about this again I wonder why one would need this. Wouldn't it be simpler (/better) to keep requesting while you get "a 429" (adding some increasing delay and also a limit on the number of requests)? You know that famous "Simple is better than complex.". I wonder if that doesn't apply here (I really do because I use a lot of services that have that kind of rate limits). If you have this protocol of "trying again while it seems reasonable", it would apply to any service, without any prior knowledge of the rate rule (which might change). – cglacet May 12 '20 at 16:22
  • Also, you never really know how this limit is enforced and you probably rarely have a guarantee that it will be respected. But maybe you are the one currently defining a limit rate on something that has no limit rate, in which case this makes sense. Otherwise the other solution seems way more robust and easy to maintain. But maybe I'm wrong. – cglacet May 12 '20 at 16:23