1

I have written a simple command to loop through all of Result objects and check its www field (representing URL of the published scientific result eg. https://doi.org/10.1109/5.771073)

There is 1M results in our db and I want to check the www field, if link is corrupted, I will guess it by appending actual doi to https://doi.org/ and save it (in the www field)

This is my first time working with asyncio but I think barebones of my code are right and I can't find out, why code gets ran synchronously.

Main command:

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import asyncio
import time

from django.core.management.base import BaseCommand

from models import Result


def run_statistics(array_of_results, num_of_results):
    num_of_correct_urls = 0
    sum_check_time = 0
    max_check_time = 0
    for res in array_of_results:
        if res[0]:
            num_of_correct_urls += 1
        if res[1] > max_check_time:
            max_check_time = res[1]

        sum_check_time += res[1]

    return f"""ran statistics on {num_of_results} results \n
            ----------------------------------------------------------------------------
            correct/corrupted link ratio: {num_of_correct_urls} / {num_of_results - num_of_correct_urls}\n
            Mean time to check URL: {sum_check_time / num_of_results}\n
            """


class Command(BaseCommand):
    help = 'checks url in www field of result, if the link is unresponsive, tries to generate new hyperlink ' \
           '(using DOI) and saves it in www_processed field'

    async def run_check(self, obj):
        """
        Takes care of checking Result www filed.
         `await obj.get_www()` passes function control back to the event loop.
         :returns
         True on unchanged url
         False otherwise
         """
        print('STARTING run_check', file=self.stdout)
        start_time = time.perf_counter()
        final_url = await obj.get_www_coroutine()

        if final_url == obj.www:
            print('STOPPING run_check', file=self.stdout)
            return True, time.perf_counter() - start_time
        else:
            print('STOPPING run_check', file=self.stdout)
            return False, time.perf_counter() - start_time

    async def main(self, objs):
        await asyncio.gather(self.run_check(objs[0]), self.run_check(objs[1]))

    def handle(self, *args, **kwargs):
        start_time = time.perf_counter()
        print('started the process', file=self.stdout)
        objs = Result.objects.all().only('www', 'www_processed', 'www_last_checked').order_by('?')[:2]
        num_of_results = 10 # Result.objects.all().count()
        print('running main', file=self.stdout)

        async def _main_routine():
            array_of_responses = await asyncio.gather(*(self.run_check(_) for _ in objs))

            print(f'retrieved {num_of_results} results, running command', file=self.stdout)

            # print(res_array, file=self.stdout)
            print(run_statistics(array_of_responses, 10) + f'total time: {time.perf_counter() - start_time}\n',
                  file=self.stdout)

        asyncio.run(_main_routine())

Method for checking www field and saving guessed link, if it needs to be done

 async def get_www_coroutine(self):
        if not self.www_last_checked or datetime.date.today() - self.www_last_checked > datetime.timedelta(days=365):
            if not self.www or not await check_url_returns_200_in_time_coroutine(self.www):  # www is corrupted
                if self.doi:
                    self.www_processed = self.get_doi_url()
                else:
                    self.www_processed = None
                self.www_last_checked = datetime.date.today()
            else:  # www looks alright
                self.www_processed = self.www
            self.save()

        return self.www_processed or False

Method for checking if link returns 200

async def check_url_returns_200_in_time_coroutine(url, timeout=1):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return response.status == 200

    except aiohttp.client_exceptions.InvalidURL:
        return False

the actual output:

started the process

running main

STARTING run_check

STOPPING run_check

STARTING run_check

STOPPING run_check

retrieved 10 results, running command

ran statistics on 10 results 

            ----------------------------------------------------------------------------
            correct/corrupted link ratio: 1 / 9

            Mean time to check URL: 0.17720807899999896

            total time: 73.279784077

As you can see code is executed sequentially and takes too long to complete. I expect to see STARTING run_check for all objects first, followed by STOPPING run_check

  • Your code looks correct, at least at first glance. Could you print the URLs that you test? The correct/corrupted ratio heavily favors the corrupted - perhaps the URLs are invalid, in which it is possible that aiohttp raises an exception before blocking on anything, so the `await` never relinquishes control to the event loop? – user4815162342 Aug 05 '19 at 22:53
  • Also, the code that prints the result count seems incorrect, as it gets the "number of results" from a hard-coded number in the source; in reality it only starts `run_check` twice, one out of which succeeds. I presume that the one that fails is the one that returns first, and that code path doesn't await anything, which is why the two don't intertwine. – user4815162342 Aug 05 '19 at 22:53

1 Answers1

3

I have finally solved the issue!!!

Code runs asynchronously (I have tested only two results, therefore it wasn't clear from the output)

Bottleneck was actually db query, objs = Result.objects.all().only('www', 'www_processed', 'www_last_checked').order_by('?')[:2] takes a lot of time since there is 1M objects, and order_by(?) needs to do some logic first. more here: How to pull a random record using Django's ORM?