5

I'm trying to get some data from thousands of urls by using asyncio. Here is a brief overview of the design:

  1. Fill up a Queue in one go with a bunch of urls using a single Producer
  2. Spawn a bunch of Consumers
  3. Each Consumer keeps asynchronously extracting urls from the Queue and sending GET requests
  4. Do some postprocessing on the result
  5. Combine all processed results and return

Problems: asyncio almost never shows if anything is wrong, it just silently hangs with no errors. I put print statements everywhere to detect problems myself, but it didn't help much.

Depending on the number of input urls and number of consumers or limits i might get these errors:

  1. Task was destroyed but it is pending!
  2. task exception was never retrieved future: <Task finished coro=<consumer()
  3. aiohttp.client_exceptions.ServerDisconnectedError
  4. aiohttp.client_exceptions.ClientOSError: [WinError 10053] An established connection was aborted by the software in your host machine

Questions: how to detect and handle exceptions in asyncio? how to retry without disrupting the Queue ?

Bellow is my code that i compiled looking at various examples of async code. Currently, there's in an intentional error at the end of a def get_video_title function. When run, nothing shows up.

import asyncio
import aiohttp
import json
import re
import nest_asyncio
nest_asyncio.apply() # jupyter notebook throws errors without this


user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36"

def get_video_title(data):
    match = re.search(r'window\[["\']ytInitialPlayerResponse["\']\]\s*=\s*(.*)', data)
    string = match[1].strip()[:-1]
    result = json.loads(string)
    return result['videoDetails']['TEST_ERROR'] # <---- should be 'title'

async def fetch(session, url, c):
    async with session.get(url, headers={"user-agent": user_agent}, raise_for_status=True, timeout=60) as r:
        print('---------Fetching', c)
        if r.status != 200:
            r.raise_for_status()
        return await r.text()

async def consumer(queue, session, responses):
    while True:
        try:
            i, url = await queue.get()
            print("Fetching from a queue", i)
            html_page = await fetch(session, url, i)

            print('+++Processing', i)
            result = get_video_title(html_page) # should raise an error here!
            responses.append(result)
            queue.task_done()

            print('+++Task Done', i)

        except (aiohttp.http_exceptions.HttpProcessingError, asyncio.TimeoutError) as e:
            print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Error', i, type(e))
            await asyncio.sleep(1)
            queue.task_done()

async def produce(queue, urls):
    for i, url in enumerate(urls):
        print('Putting in a queue', i)
        await queue.put((i, url))

async def run(session, urls, consumer_num):
    queue, responses = asyncio.Queue(maxsize=2000), []

    print('[Making Consumers]')
    consumers = [asyncio.ensure_future(
        consumer(queue, session, responses)) 
                 for _ in range(consumer_num)]

    print('[Making Producer]')
    producer = await produce(queue=queue, urls=urls)

    print('[Joining queue]')
    await queue.join()

    print('[Cancelling]')
    for consumer_future in consumers:
        consumer_future.cancel()

    print('[Returning results]')
    return responses

async def main(loop, urls):
    print('Starting a Session')
    async with aiohttp.ClientSession(loop=loop, connector=aiohttp.TCPConnector(limit=300)) as session:
        print('Calling main function')
        posts = await run(session, urls, 100)
        print('Done')
        return posts


if __name__ == '__main__':
    urls = ['https://www.youtube.com/watch?v=dNQs_Bef_V8'] * 100
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(loop, urls))
Superbman
  • 787
  • 1
  • 8
  • 24

1 Answers1

11

The problem is that your consumer catches only two very specific exceptions, and in their case marks the task as done. If any other exception happens, such as a network-related exception, it will terminate the consumer. However, this is not detected by run, which is awaiting queue.join() with the consumer (effectively) running in the background. This is why your program hangs - queued items are never accounted for, and the queue is never fully processed.

There are two ways to fix this, depending on what you want your program to do when it encounters an unanticipated exception. If you want it to keep running, you can add a catch-all except clause to the consumer, e.g.:

        except Exception as e
            print('other error', e)
            queue.task_done()

The alternative is for an unhandled consumer exception to propagate to run. This must be arranged explicitly, but has the advantage of never allowing exceptions to pass silently. (See this article for a detailed treatment of the subject.) One way to achieve it is to wait for queue.join() and the consumers at the same time; since consumers are in an infinite loop, they will complete only in case of an exception.

    print('[Joining queue]')
    # wait for either `queue.join()` to complete or a consumer to raise
    done, _ = await asyncio.wait([queue.join(), *consumers],
                                 return_when=asyncio.FIRST_COMPLETED)
    consumers_raised = set(done) & set(consumers)
    if consumers_raised:
        await consumers_raised.pop()  # propagate the exception

Questions: how to detect and handle exceptions in asyncio?

Exceptions are propagated through await and normally detected and handled like in any other code. The special handling is only needed to catch exceptions that leak from a "background" task like the consumer.

how to retry without disrupting the Queue ?

You can call await queue.put((i, url)) in the except block. The item will be added to the back of the queue, to be picked up by a consumer. In that case you only need the first snippet, and don't want to bother with trying to propagate the exception in consumer to run.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Thank you for such an articulate and thorough answer. Before reading your answer i did the following to fix the issues i mentioned: 1) For retries i added a `for` loop inside a `while` loop. 2) I aslo added a `KeyError` exception inside a `get_video_title` function 3) I lowered max number of consumers and limited max number of TCP connections 4) i set `raise_for_status` to `False` in a `GET` request. All in all, it solved all the problems and i'm able to proccess 1000 urls in slightly more than a minute. Task manager shows that the bottle neck is my wi-fi speed – Superbman Jan 08 '20 at 04:54