0

I am new to python and have been trying to set a rate limit for a program that works with asyncio for concurrent requests but after trying almost everything I found in stackoverflow and other programming websites I don't really know if I am missing something or I am doing it wrong so I hope the community can help me.

My implementation is pretty straightforward for an asyncio program:

async def inside_api_call_1(data, session):
  r = await do_request()
  return r;

async def inside_api_call_2(data, session):
  r = await do_another_request()
  return r;

async def full_process(data, session):
  res1 = await inside_api_call_1(data, session)
  if (res1):
    res2 = await inside_api_call_1(data, session)

async def main_func():
  tasks = []
  async with aiohttp.ClientSession() as session:
    for data in some_json_file:
      tasks.append(full_process(data, session))
    await asyncio.gather(*tasks, return_exceptions=True)

loop = asyncio.get_event_loop()
loop.run_until_complete(main_block())

What I am trying to do is set a rate limit for these _api_call functions to be 16 req/sec GLOBALLY (the sum of all requests (in this case I am making two request one after another as the second needs the first one result to proceed) must not exceed 16 req/sec)

My first thought was to set an sleep(1/16) for both functions like this:

async def inside_api_call_1(data, session):
  r = await do_request()
  await asyncio.sleep(1/16) # <--- Wait 1/16 sec
  return r;

async def inside_api_call_2(data, session):
  r = await do_another_request()
  await asyncio.sleep(1/16) # <--- Wait 1/16 sec
  return r;

But it didn't work as I kept getting 429 Too Many Request errors.

Also tried semaphores, but this solution seems to be related more to the number of active connections at the same time rather than the number of req / sec

aiohttp: set maximum number of requests per second

Then I attempted to use the python package https://github.com/hallazzang/asyncio-throttle And set my code like this:

async def main_func():
  tasks = []
  throttler = Throttler(rate_limit=16) <-- setting throttler to 16 req / sec
  async with throttler:
    async with aiohttp.ClientSession() as session:
      for data in some_json_file:
        tasks.append(full_process(data, session))
      await asyncio.gather(*tasks, return_exceptions=True)
  await asyncio.sleep(1/16) <-- This line as the documentation put in the example

loop = asyncio.get_event_loop()
loop.run_until_complete(main_block())

From here I have been moving the statements both for the sleep method and the asyncio-throttle process from the main_func to the inside_api_call methods:

async def inside_api_call_1(data, session):
  async with throttler:
    r = await do_request()
    await asyncio.sleep(1/16)
  return r;

async def inside_api_call_2(data, session):
  async with throttler:
    r = await do_another_request()
    await asyncio.sleep(1/16)
  return r;

Without success.

I am also new to async in Python so any help is appreaciated.

Henry Peregrino
  • 126
  • 1
  • 1
  • 8
  • How does your `inside_api_call_1` function know what the value of the variable `throttler` is? It's not a global and it's not an argument. Throttler seems like the perfect tool for this problem, but you haven't given us enough of your program to see why it's not working. Also, I don't see the point of the line `async with throttler` in main_func, since that's not where you want the throttling effect to operate. Did you try throttling down to something like 8 commands per second, just to make sure you're not on the edge? – Paul Cornelius May 27 '22 at 03:06

1 Answers1

0

Try https://pypi.org/project/limiter/ or

    async def main_func():
        tasks = []
        async with aiohttp.ClientSession() as session:
            for idx, data in enumerate(some_json_file):
                tasks.append(full_process(data, session))
                if idx % 16 == 0:
                    await asyncio.gather(*tasks, return_exceptions=True)
                    tasks = []
                    sleep(1)
            if tasks:
                await asyncio.gather(*tasks, return_exceptions=True)

stahh
  • 149
  • 5