1

I am making several 100's of http request using aiohttp. I am relatively new to the async world but have managed to get the basic code working.

First I am generating a token. Then, making aiohttp calls using this token. Token has a validity of 30 mins. So I am assuming if my calls run for more than 30 mins then they will start failing.

How do I update my code to plug-in a new token after 30 mins then resume the remaining calls. This my first time implementing async calls, so relatively clueless on how to handle this.

async def a_get_all_user_details(urls):
    results = []

    connector = aiohttp.TCPConnector(limit=70)
    timeout = aiohttp.ClientTimeout(total=None, connect=300, sock_connect=300, sock_read=None)

    auth_token = get_token()  # token expires in 30 mins
    headers = {
        'accept': 'application/json',
        'Authorization': 'Bearer ' + auth_token
    }

    async with aiohttp.ClientSession(trust_env=True, headers=headers, connector=connector, timeout=timeout) as session:
        for url in urls:
            result = asyncio.ensure_future(a_get_user_details(url, session))
            results.append(result)

        responses = await asyncio.gather(*results)
        return responses


def main():
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(a_get_all_user_details(search_urls))
    user_details = loop.run_until_complete(future)
CD.
  • 43
  • 3

1 Answers1

1

Maybe there's simpler way to do it but here's my take:

The problem is that there are many connections in the fly when you want to refresh session. When you close the session and create new one, active connections which are waiting for data throw an exception.

In my example I have a list of all sessions and when time arrives I simply create new session (with new token) and append it to the list. The new connections will use the last (freshest) session.

At the end of script I close all sessions.

import aiohttp
import asyncio

sessions = []


async def get_token():
    return "XYZ"


async def refresh_session():
    # this function periodically refreshes the token every X sec
    connector = aiohttp.TCPConnector(limit=3)
    timeout = aiohttp.ClientTimeout(
        total=None, connect=300, sock_connect=300, sock_read=None
    )

    while True:
        headers = {
            "accept": "application/json",
            "Authorization": "Bearer " + await get_token(),
        }

        sessions.append(
            aiohttp.ClientSession(
                trust_env=True,
                headers=headers,
                connector=connector,
                timeout=timeout,
            )
        )
        print("New session created")
        await asyncio.sleep(5)  # every 5 seconds refresh session


async def get_user_detail(url):
    # wait for session to show up:
    while not sessions:
        await asyncio.sleep(1)

    # use last (freshest) session:
    async with sessions[-1].get(url) as resp:
        assert resp.status == 200
        html = await resp.text()
        return f"some result for {url} length of data {len(html)}"


async def get_user_details(urls):
    results = []
    for url in urls:
        results.append(asyncio.ensure_future(get_user_detail(url)))

    responses = await asyncio.gather(*results)
    return responses


async def main():
    # some urls to gather:
    urls = [
        "https://www.google.com",
        "https://www.microsoft.com",
        "https://www.yahoo.com",
    ] * 30

    t1 = asyncio.create_task(refresh_session())
    t2 = asyncio.create_task(get_user_details(urls))

    # finish when first task ends (in this case get_user_details())
    done, _ = await asyncio.wait([t1, t2], return_when=asyncio.FIRST_COMPLETED)

    # close all opened sessions:
    for s in sessions:
        await s.close()

    # print the result
    print("Domains gathered ", len(done.pop().result()))


if __name__ == "__main__":
    asyncio.run(main())

This prints:

New session created
New session created
Domains gathered  90
Andrej Kesely
  • 168,389
  • 15
  • 48
  • 91