0

I want to make N requests per second.

I have tried using RateLimiter class as shown here https://gist.github.com/yijxiang/4042b3d746471d0fc226215daf96eee7, incorporated aiolimiter as shown here https://aiolimiter.readthedocs.io/, used Semaphore for concurrent limiting too, used TCPConnector as shown here aiohttp: set maximum number of requests per second, used asyncio.sleep and time.sleep, but I am unable to resolve the 429 error.

This is my code.

       async def download_order(order_id):
            get_order_url = f'https://api.planet.com/compute/ops/orders/v2/{order_id}'

            headers = {
                'Authorization': f'api-key {self.api_key}',
                'Content-Type': 'application/json',
            }
            async with aiohttp.ClientSession() as session:
                status = 'queued'
                # keep on making requests until the status is success
                while status != 'success':
                    async with session.get(get_order_url, headers=headers) as resp:
                            if resp.status != 200:
                                print(f"Failed to get order status: {resp.text}")
                            else:
                                resp_json = await resp.json()
                                name = resp_json['name']
                                status = resp_json['state']
                                print(f"Order with name {name} has status {status}.")
        
        async def main_wrapper():
            activated_order_ids = await post_order(df)

            tasks = []

            for order_id in activated_order_ids:
                    task = download_order(order_id)
                    tasks.append(task)

            await asyncio.gather(*tasks)

        loop = asyncio.get_event_loop()
        loop.run_until_complete(main_wrapper())

This is what I did using semaphore and using a manual delay.

        async def download_order(order_id, semaphore):
            async with semaphore:
                async def download(url: str, filename: str):
                    pass

            get_order_url = f'https://api.planet.com/compute/ops/orders/v2/{order_id}'

            headers = {
                'Authorization': f'api-key {self.api_key}',
                'Content-Type': 'application/json',
            }
            async with aiohttp.ClientSession() as session:
                status = 'queued'
                # keep on making requests until the status is success
                while status != 'success':
                    async with session.get(get_order_url, headers=headers) as resp:
                            if resp.status != 200:
                                print(f"Failed to get order status: {resp.text}")
                            else:
                                resp_json = await resp.json()
                                name = resp_json['name']
                                status = resp_json['state']
                                print(f"Order Name: {name}\nCurrent Status:{status}\n")
                                if status == 'success':
                                    download_url = resp_json['_links']['results'][0]['location']
                                    directory_path = "./orders" # Check if the directory exists
                                    if not os.path.exists(directory_path):
                                        # Create the directory
                                        os.makedirs(directory_path)
                                    else:
                                        pass
                                    await download(download_url, f'./orders/{name}.zip')

                    await asyncio.sleep(10)
        
        async def main_wrapper():
            activated_order_ids = await post_order(df)

            loop = asyncio.get_running_loop()

            semaphore = asyncio.Semaphore(5)
            tasks = [loop.create_task(download_order(order_id, semaphore)) for order_id in activated_order_ids]
            await asyncio.gather(*tasks, return_exceptions=True)

        asyncio.run(main_wrapper())

This is what I want to do. Using the order_ids given in activated_order_ids list, I want to make requests continuously after every 10 seconds to check for the status of the order. I cannot make more than 5 requests per second as per Planet's Rate Limiting Policy.

I have solved the first part by using time.sleep(10) for checking the status, but I am unable to respect the API limits. Using concurrent tasks, I am making requests 15 at a time or more, the maximum number of requests I want to make is 5 per second.

Please help me here.

aayush_malik
  • 161
  • 13
  • 1
    You mentioned about six different ways of solving this problem. You told us that you tried them all but none of them worked. But in your code you don't show anything at all that would limit the rate. Since other people have used those methods successfully, you must be using them wrong. Instead of asking for a seventh method, why don't you show us exactly how you are using one or two of them so we can try to figure out what's wrong? – Paul Cornelius Jul 09 '23 at 21:45
  • Added one of the solutions @PaulCornelius – aayush_malik Jul 10 '23 at 02:18
  • 1
    What you have wrapped in the Semaphore is the *declaration* of `download`. A def statement does not execute until you *call* the function. You either need to move the Semaphone acquisition statement inside the function, or wrap the Semaphore around the function *call* (`await download(download_url, f'./orders/{name}.zip')`). – Paul Cornelius Jul 10 '23 at 11:14

0 Answers0