I want to make N requests per second.
I have tried using RateLimiter class as shown here https://gist.github.com/yijxiang/4042b3d746471d0fc226215daf96eee7, incorporated aiolimiter as shown here https://aiolimiter.readthedocs.io/, used Semaphore for concurrent limiting too, used TCPConnector
as shown here aiohttp: set maximum number of requests per second, used asyncio.sleep
and time.sleep
, but I am unable to resolve the 429 error.
This is my code.
async def download_order(order_id):
get_order_url = f'https://api.planet.com/compute/ops/orders/v2/{order_id}'
headers = {
'Authorization': f'api-key {self.api_key}',
'Content-Type': 'application/json',
}
async with aiohttp.ClientSession() as session:
status = 'queued'
# keep on making requests until the status is success
while status != 'success':
async with session.get(get_order_url, headers=headers) as resp:
if resp.status != 200:
print(f"Failed to get order status: {resp.text}")
else:
resp_json = await resp.json()
name = resp_json['name']
status = resp_json['state']
print(f"Order with name {name} has status {status}.")
async def main_wrapper():
activated_order_ids = await post_order(df)
tasks = []
for order_id in activated_order_ids:
task = download_order(order_id)
tasks.append(task)
await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(main_wrapper())
This is what I did using semaphore and using a manual delay.
async def download_order(order_id, semaphore):
async with semaphore:
async def download(url: str, filename: str):
pass
get_order_url = f'https://api.planet.com/compute/ops/orders/v2/{order_id}'
headers = {
'Authorization': f'api-key {self.api_key}',
'Content-Type': 'application/json',
}
async with aiohttp.ClientSession() as session:
status = 'queued'
# keep on making requests until the status is success
while status != 'success':
async with session.get(get_order_url, headers=headers) as resp:
if resp.status != 200:
print(f"Failed to get order status: {resp.text}")
else:
resp_json = await resp.json()
name = resp_json['name']
status = resp_json['state']
print(f"Order Name: {name}\nCurrent Status:{status}\n")
if status == 'success':
download_url = resp_json['_links']['results'][0]['location']
directory_path = "./orders" # Check if the directory exists
if not os.path.exists(directory_path):
# Create the directory
os.makedirs(directory_path)
else:
pass
await download(download_url, f'./orders/{name}.zip')
await asyncio.sleep(10)
async def main_wrapper():
activated_order_ids = await post_order(df)
loop = asyncio.get_running_loop()
semaphore = asyncio.Semaphore(5)
tasks = [loop.create_task(download_order(order_id, semaphore)) for order_id in activated_order_ids]
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.run(main_wrapper())
This is what I want to do. Using the order_ids given in activated_order_ids list, I want to make requests continuously after every 10 seconds to check for the status of the order. I cannot make more than 5 requests per second as per Planet's Rate Limiting Policy.
I have solved the first part by using time.sleep(10)
for checking the status, but I am unable to respect the API limits. Using concurrent tasks, I am making requests 15 at a time or more, the maximum number of requests I want to make is 5 per second.
Please help me here.