I would suggest to use the asyncio.Queue in this case. You don't want to create 60k tasks for each URL. When you use queue, you can spawn a set number of workers and limit the queue size:
If maxsize is less than or equal to zero, the queue size is infinite.
If it is an integer greater than 0, then await put() blocks when the
queue reaches maxsize until an item is removed by get().
import asyncio
import random
WORKERS = 10
async def worker(q):
while True:
url = await q.get()
t = random.uniform(1, 5)
print(f"START: {url} ({t:.2f}s)")
await asyncio.sleep(t)
print(f"END: {url}")
q.task_done()
async def main():
q = asyncio.Queue(maxsize=100)
tasks = []
for _ in range(WORKERS):
tasks.append(asyncio.create_task(worker(q)))
for i in range(10):
await q.put(f"http://example.com/{i}")
await q.join()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
main = asyncio.run(main())
Test:
$ python test.py
START: http://example.com/0 (1.14s)
START: http://example.com/1 (4.40s)
START: http://example.com/2 (2.48s)
START: http://example.com/3 (4.34s)
START: http://example.com/4 (1.94s)
END: http://example.com/0
START: http://example.com/5 (1.52s)
END: http://example.com/4
START: http://example.com/6 (4.84s)
END: http://example.com/2
START: http://example.com/7 (4.35s)
END: http://example.com/5
START: http://example.com/8 (2.33s)
END: http://example.com/3
START: http://example.com/9 (1.80s)
END: http://example.com/1
END: http://example.com/8
END: http://example.com/9
END: http://example.com/6
END: http://example.com/7
Btw writing to files will block your main event loop, either call it in run_in_executor or use aiofiles.
Update Sat 3 Apr 13:49:55 UTC 2021:
Example:
import asyncio
import traceback
import aiohttp
WORKERS = 5
URLS = [
"http://airbnb.com",
"http://amazon.co.uk",
"http://amazon.com",
"http://baidu.com",
"http://basecamp.com",
"http://bing.com",
"http://djangoproject.com",
"http://envato.com",
"http://facebook.com",
"http://github.com",
"http://gmail.com",
"http://google.co.uk",
"http://google.com",
"http://google.es",
"http://google.fr",
"http://heroku.com",
"http://instagram.com",
"http://linkedin.com",
"http://live.com",
"http://netflix.com",
"http://rubyonrails.org",
"http://shopify.com",
"http://stackoverflow.com",
"http://trello.com",
"http://wordpress.com",
"http://yahoo.com",
"http://yandex.ru",
"http://yiiframework.com",
"http://youtube.com",
]
class Bot:
async def fetch(self, client, url):
async with client.get(url) as r:
return await r.text()
async def worker(self, q, client):
loop = asyncio.get_running_loop()
while True:
url = await q.get()
try:
html = await self.fetch(client, url)
except Exception:
traceback.print_exc()
else:
await loop.run_in_executor(None, self.save_to_disk, url, html)
finally:
q.task_done()
def save_to_disk(self, url, html):
print(f"{url} ({len(html)})")
async def main():
q = asyncio.Queue(maxsize=100)
tasks = []
async with aiohttp.ClientSession() as client:
bot = Bot()
for _ in range(WORKERS):
tasks.append(asyncio.create_task(bot.worker(q, client)))
for url in URLS:
await q.put(url)
await q.join()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
main = asyncio.run(main())