2

My use case is to run some performance tests so I wanted to create an app where I run 1 task 4 times, compute the time average for that task, then run 2 tasks asynchronously, compute the average, then run 4 tasks asynchronously, compute the average, then 8 and so on.

However, I am not able to run like this. When I do, all tasks it seems have been executed before and I get wrong times.

I tried some hit and trial and with the below code now I get TypeError: An asyncio.Future, a coroutine or an awaitable is required sys:1: RuntimeWarning: coroutine 'go' was never awaited on line loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks))) in run_tasks function.

Below is my code:

async def go(date):
    pool = await aiopg.create_pool("**db connection**")
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:

            await cur.execute(""" some query """)
            time.sleep(1)

            ret = []
            async for row in cur:
                ret.append(row)


def date_range(date1, date2):
    for n in range(int((date2 - date1).days)+1):
        yield date1 + timedelta(n)


def run_tasks():

    start_dt = datetime(2017, 8, 9)
    end_dt = datetime(2017, 8, 10)

    tasks = []
    some_tasks = []

    avg_time_run = []

    for dt in date_range(start_dt, end_dt):
        #tasks.append(asyncio.ensure_future(go(dt.strftime("%Y-%m-%d %H:%M:%S"))))
        tasks.append(go(dt.strftime("%Y-%m-%d %H:%M:%S")))

    i = 1
    prev = 0
    while i < 2: # i < 128

        # Get i number of tasks from task list
        for k in range(prev, i):
            some_tasks.append(tasks[k])

        prev = len(some_tasks)
        time_run = []
        for j in range(0, 4):  # repeat task 4 times
            start = time.time()
            loop = asyncio.get_event_loop()

            loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks)))
            # loop.close()

            end = time.time()
            diff = end - start
            time_run.append(diff)
            print("ith SomeTask: {}, {}".format(i, some_tasks))
            print("Total time: {}".format(diff))

        # get average of each task run 4 times
        avg_time_run.append(sum(time_run) / float(len(time_run)))
        i *= 2

    return avg_time_run


print(run_tasks())    

Some hints will be appreciated. Where should I put await as it's there as asyncio.wait

Atihska
  • 4,803
  • 10
  • 56
  • 98

2 Answers2

3
asyncio.ensure_future(some_tasks)

You're passing list of coroutines to asyncio.ensure_future. As you can see in documentation this is not how this function works: you should pass single coroutine to create asyncio.Task. This is why you're getting TypeError, you're getting RuntimeWarning then since created go coroutines weren't awaited as result of all above.

You don't need asyncio.Task in this case at all, just pass list of coroutines to asyncio.wait:

loop.run_until_complete(asyncio.wait(some_tasks))

One more important thing:

time.sleep(1)

You should never do it inside coroutines: it freezes your event loop (and all coroutines everywhere with it). Please read this answer to read about how asyncio works in general.

If you want to sleep some time inside coroutines use asyncio.sleep:

await asyncio.sleep(1)
Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • Thanks @Mikhail. I was trying out some stuff as well and found out the mistake. Also, made a couple of changes. Please see my answer and let me know for any improvements. Another quick question is that where should I put loop.close() as wherever I put it tells that tasks are running? – Atihska Jan 09 '18 at 22:10
  • @Atihska When you write asyncio script you should sure all tasks are finished and all created coroutines are awaited at the end of execution. The opposite usually means some mistake in code. `loop.close()` can help you to see it by raising warnings. That's why it's usually placed at the end of your script. If I'm not wrong putting it after `print(run_tasks())` at your updated code shouldn't give warnings now. – Mikhail Gerasimov Jan 10 '18 at 05:07
0

ANSWER CODE:

async def run(date): // for adopt, check above go() function
    conn = await asyncpg.connect("db connections")
    values = await conn.fetch("""some query """)
    await asyncio.sleep(1)
    await conn.close()


def date_range(date1, date2):
    for n in range(int((date2 - date1).days)+1):
        yield date1 + timedelta(n)


def run_tasks():

    start_dt = datetime(2017, 8, 9)
    end_dt = datetime(2017, 8, 10)

    tasks = []

    avg_time_run = []

    i = 1

    while i < 9:  # num of tasks incremented
        time_run = []

        start = time.time()
        loop = asyncio.get_event_loop()

        for dt in date_range(start_dt, end_dt):
            if len(tasks) < i:
                print(dt)
                tasks.append(asyncio.ensure_future(run(dt.strftime("%Y-%m-%d %H:%M:%S"))))

                if len(tasks) == i:

                    for j in range(0, 4):  # repeat task 4 times
                        print("J counter: {}".format(j))

                        loop.run_until_complete(asyncio.wait(tasks))

                        end = time.time()
                        diff = end - start
                        time_run.append(diff)
                        print("Num of Tasks executing: {}, {}".format(i, tasks))
                        print("Task len: {}".format(len(tasks)))
                        print("Total time: {}".format(diff))

        # get average of each task run 4 times
        avg_time_run.append(sum(time_run) / float(len(time_run)))
        start_dt = end_dt + timedelta(days=1)
        end_dt = end_dt + timedelta(days=(i * 2 - i))
        i *= 2

        print(start_dt)
        print(end_dt)
        #loop.close()

    return avg_time_run


print(run_tasks())
Atihska
  • 4,803
  • 10
  • 56
  • 98
  • If it works, I guess it's ok. Only thing I would change is adding `try finally` block when you create/close connection inside `run`. Example can be found in [this snippet](https://github.com/MagicStack/asyncpg/blob/master/docs/usage.rst#custom-type-conversions). – Mikhail Gerasimov Jan 10 '18 at 04:59
  • @MikhailGerasimov ok and should I use a global event loop to so as to close it in the end. Current scope, when I close it, I get operational error with pending tasks, it says. – Atihska Jan 10 '18 at 05:02
  • actually, your event loop is already global: `asyncio.get_event_loop()` returns same object. That's how asyncio works: multiple coroutines managed by single event loop allowing them being run concurrently. You can just add `loop = asyncio.get_event_loop(); loop.close()` at the end of your script. Alternatively you can use different event loop for each loop iteration using `loop = asyncio.new_event_loop()` and `asyncio.set_event_loop(loop)` (and closing this new event loop at the end of each iteration). – Mikhail Gerasimov Jan 10 '18 at 05:14