0

first time trying asyncio and aiohttp. I have the following code that gets urls from the MySQL database for GET requests. Gets the responses and pushes them to MySQL database.

if __name__ == "__main__":
    database_name = 'db_name'
    company_name = 'company_name'

    my_db = Db(database=database_name) # wrapper class for mysql.connector
    urls_dict = my_db.get_rest_api_urls_for_specific_company(company_name=company_name)
    update_id = my_db.get_updateid()
    my_db.get_connection(dictionary=True)

    for url in urls_dict:
        url_id = url['id']
        url = url['url']
        table_name = my_db.make_sql_table_name_by_url(url)
        insert_query = my_db.get_sql_for_insert(table_name)
        r = requests.get(url=url).json() # make the request
        args = [json.dumps(r), update_id, url_id]
        my_db.db_execute_one(insert_query, args, close_conn=False)

    my_db.close_conn()

This works fine but to speed it up How can I run it asynchronously?

I have looked here, here and here but can't seem to get my head around it.

Here is what I have tried based on @Raphael Medaer's answer.

async def fetch(url):
    async with ClientSession() as session:
        async with session.request(method='GET', url=url) as response:
            json = await response.json()
            return json


async def process(url, update_id):
    table_name = await db.make_sql_table_name_by_url(url)
    result = await fetch(url)
    print(url, result)

if __name__ == "__main__":
    """Get urls from DB"""
    db = Db(database="fuse_src")
    urls = db.get_rest_api_urls()  # This returns list of dictionary
    update_id = db.get_updateid()
    url_list = []
    for url in urls:
        url_list.append(url['url'])
    print(update_id)
    asyncio.get_event_loop().run_until_complete(
        asyncio.gather(*[process(url, update_id) for url in url_list]))

I get an error in the process method:

TypeError: object str can't be used in 'await' expression

Not sure whats the problem?

Any code example specific to this would be highly appreciated.

Shery
  • 1,808
  • 5
  • 27
  • 51

1 Answers1

6

Make this code asynchronous will not speed it up at all. Except if you consider to run a part of your code in "parallel". For instance you can run multiple (SQL or HTTP) queries in "same time". By doing asynchronous programming you will not execute code in "same time". Although you will get benefit of long IO tasks to execute other part of your code while you're waiting for IOs.

First of all, you'll have to use asynchronous libraries (instead of synchronous one).

  • mysql.connector could be replaced by aiomysql from aio-libs.
  • requests could be replaced by aiohttp

To execute multiple asynchronous tasks in "parallel" (for instance to replace your loop for url in urls_dict:), you have to read carefully about asyncio tasks and function gather.

I will not (re)write your code in an asynchronous way, however here are a few lines of pseudo code which could help you:

async def process(url):
    result = await fetch(url)
    await db.commit(result)

if __name__ == "__main__":
    db = MyDbConnection()
    urls = await db.fetch_all_urls()

    asyncio.get_event_loop().run_until_complete(
        asyncio.gather(*[process(url) for url in urls]))
Raphael Medaer
  • 2,528
  • 12
  • 18
  • This is super helpful. Will try and report back. thanks @Raphael Medaer :) – Shery May 01 '20 at 09:13
  • I have good success as far as getting url results but stumped on db side :( – Shery May 01 '20 at 16:47
  • I saw your edit with the error. However without the full stacktrace we cannot help you. Could you also put the code of `db.make_sql_table_name_by_url(url)` and [write a minimal reproducible example](https://stackoverflow.com/help/minimal-reproducible-example) ? – Raphael Medaer May 02 '20 at 08:25
  • The above error was due to the fact that `db.make_sql_table_name_by_url(url)` method wasn't `async` lol. I fixed it and it works fine. The problem I have now is if the `db.make_sql_table_name_by_url(url)` is in the same file, it works. But if I create a wrapper class out of it. It gives `AttributeError: module 'asyncio.streams' has no attribute 'IncompleteReadError'` – Shery May 02 '20 at 10:56
  • OK. I guess we can consider that off topic or as another question. I hope my answer helped you to get a better idea of the big picture. Could you give a full working example once it's done ? And optionally accept the answer ? Kind regards, – Raphael Medaer May 02 '20 at 12:18