1

I have list of URL links which I get and save to HTML files with following code:

    tasksURL = []
    async with aiohttp.ClientSession() as session:
        for url in listOfURLs:
            tasksURL.append(self.fetch(session, url))
        allHTMLs = await asyncio.gather(*tasksURL)
    i = 0
    for html in allHTMLs:
        i += 1
        with open("myPath.html", mode='w', encoding='UTF-8', errors='strict', buffering=1) as f:
            f.write(html)

Since URL list can be quite large (up to 60 000) I need to chunk this tasks.

I tried following solution. I've defined function that will chop list in smaller chunks with this function:

def chunkList(self, listOfURLs, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

And than use this function to run each chunked piece of listOfURLs like this:

tasksURL = []
chunkedListOfURLs = self.chunkList(listOfURLs, 5)
for URLList in chunkedListOfURLs:
    async with aiohttp.ClientSession() as session:
        for url in URLList:
            tasksURL.append(self.fetch(session, url))
        allHTMLs = await asyncio.gather(*tasksURL)
    for html in allHTMLs:
        with open("myPath.html", mode='w', encoding='UTF-8', errors='strict', buffering=1) as f:
            f.write(html)

I'm getting error:

RuntimeError: cannot reuse already awaited coroutine

I understand problem but haven't found way around it.

Hrvoje
  • 13,566
  • 7
  • 90
  • 104
  • Not sure why you are getting the reuse error, perhaps you never reset tasksURL when the task group is done. However the all your reads will finish at the speed of the slowest for each 5 read. Try Semaphone as per [Mikhail's](https://stackoverflow.com/a/48486557/6242321) answer – jwal Apr 02 '21 at 20:38

2 Answers2

3

I would suggest to use the asyncio.Queue in this case. You don't want to create 60k tasks for each URL. When you use queue, you can spawn a set number of workers and limit the queue size:

If maxsize is less than or equal to zero, the queue size is infinite. If it is an integer greater than 0, then await put() blocks when the queue reaches maxsize until an item is removed by get().

import asyncio
import random

WORKERS = 10


async def worker(q):
    while True:
        url = await q.get()
        t = random.uniform(1, 5)

        print(f"START: {url} ({t:.2f}s)")
        await asyncio.sleep(t)
        print(f"END: {url}")

        q.task_done()


async def main():
    q = asyncio.Queue(maxsize=100)

    tasks = []

    for _ in range(WORKERS):
        tasks.append(asyncio.create_task(worker(q)))

    for i in range(10):
        await q.put(f"http://example.com/{i}")

    await q.join()

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    main = asyncio.run(main())

Test:

$ python test.py
START: http://example.com/0 (1.14s)
START: http://example.com/1 (4.40s)
START: http://example.com/2 (2.48s)
START: http://example.com/3 (4.34s)
START: http://example.com/4 (1.94s)
END: http://example.com/0
START: http://example.com/5 (1.52s)
END: http://example.com/4
START: http://example.com/6 (4.84s)
END: http://example.com/2
START: http://example.com/7 (4.35s)
END: http://example.com/5
START: http://example.com/8 (2.33s)
END: http://example.com/3
START: http://example.com/9 (1.80s)
END: http://example.com/1
END: http://example.com/8
END: http://example.com/9
END: http://example.com/6
END: http://example.com/7

Btw writing to files will block your main event loop, either call it in run_in_executor or use aiofiles.

Update Sat 3 Apr 13:49:55 UTC 2021:

Example:

import asyncio
import traceback

import aiohttp

WORKERS = 5
URLS = [
    "http://airbnb.com",
    "http://amazon.co.uk",
    "http://amazon.com",
    "http://baidu.com",
    "http://basecamp.com",
    "http://bing.com",
    "http://djangoproject.com",
    "http://envato.com",
    "http://facebook.com",
    "http://github.com",
    "http://gmail.com",
    "http://google.co.uk",
    "http://google.com",
    "http://google.es",
    "http://google.fr",
    "http://heroku.com",
    "http://instagram.com",
    "http://linkedin.com",
    "http://live.com",
    "http://netflix.com",
    "http://rubyonrails.org",
    "http://shopify.com",
    "http://stackoverflow.com",
    "http://trello.com",
    "http://wordpress.com",
    "http://yahoo.com",
    "http://yandex.ru",
    "http://yiiframework.com",
    "http://youtube.com",
]


class Bot:
    async def fetch(self, client, url):
        async with client.get(url) as r:
            return await r.text()

    async def worker(self, q, client):
        loop = asyncio.get_running_loop()

        while True:
            url = await q.get()

            try:
                html = await self.fetch(client, url)
            except Exception:
                traceback.print_exc()
            else:
                await loop.run_in_executor(None, self.save_to_disk, url, html)
            finally:
                q.task_done()

    def save_to_disk(self, url, html):
        print(f"{url} ({len(html)})")


async def main():
    q = asyncio.Queue(maxsize=100)
    tasks = []

    async with aiohttp.ClientSession() as client:
        bot = Bot()

        for _ in range(WORKERS):
            tasks.append(asyncio.create_task(bot.worker(q, client)))

        for url in URLS:
            await q.put(url)

        await q.join()

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    main = asyncio.run(main())
HTF
  • 6,632
  • 6
  • 30
  • 49
  • 1
    It seems unnecessary to implement all of that. @Matt Fowler in his answer mentioned that there is aiohttp's client session allows 100 concurrent connections. Also if I apply 2nd answer solution it will limit number of tasks by n defined in list so I don't need to implement asyncio.Queue since list will never be larger than n - which is max number of tasks in task list which coresponds to urls in urlList. – Hrvoje Apr 03 '21 at 09:35
  • In regard with writing filles blocking operations. You are right. But in order not to hog down machines' resources I might keep it like that. – Hrvoje Apr 03 '21 at 09:38
  • @Harvey I get your point but your solution is not efficient, imagine that one of the requests is taking 10s, the whole `for` loop for that chunk will have to wait. If you use queue instead, other URLs will be processed simultaneously. – HTF Apr 03 '21 at 09:58
  • So I just iterate over my list here : `for i in range(10): await q.put(f"http://example.com/{i}")`? I guess I'm dropping this part out `for task in tasks: task.cancel()` right? – Hrvoje Apr 03 '21 at 10:15
  • 1
    Yes, you feed your URLs into the queue. The `worker` function is using `while True` so you have to cancel them at the end. You can try with `while not q.empty()` instead so you can then skip the cancellation step. – HTF Apr 03 '21 at 10:45
  • I don't get where do I put my `async with aiohttp.ClientSession() as session: for url in URLList: tasksURL.append(fetch(session, url))` main part? – Hrvoje Apr 03 '21 at 11:40
  • Check my update but I think you should actually accept @Matt's answer as that's the answer to your original question. – HTF Apr 03 '21 at 14:01
  • Yes but you wrote complete solution which works the way it suppose to. Thank you for your effort! – Hrvoje Apr 03 '21 at 14:25
2

In your example your tasksURL array will have a set of awaited coroutines after every chunk that you successfully process. You then append new coroutines to that list on subsequent iterations and when you go to gather you're trying to await complete coroutines as well as new, unawaited ones. Simply creating a new tasksURL list for each chunk will solve your problem:

for URLList in chunkedListOfURLs:
    tasksURL = []
    async with aiohttp.ClientSession() as session:
        for url in URLList:
            tasksURL.append(fetch(session, url))
        allHTMLs = await asyncio.gather(*tasksURL)

Note that by default, aiohttp's client session allows 100 concurrent connections. See https://docs.aiohttp.org/en/stable/client_advanced.html#limiting-connection-pool-size for more details, so you get some concurrency limits out of the box without chunking. Semaphores and queues are also other options to limit concurrency depending on your requirements as mentioned in other answers.

Matt Fowler
  • 2,563
  • 2
  • 15
  • 18