2

I have a list of objects, and for each object, I need to do some async work on it. I am not sure if I construct it right:

def run(tasks):
    async def async_wrapper():
        async def update_task():
            updated_task = await task_manager.async_get_task_status(session, task)
            # do some works with updated_task

        workers = []
        resolver = aiohttp.AsyncResolver()
        connector = aiohttp.TCPConnector(resolver=resolver, family=socket.AF_INET)
        async with aiohttp.ClientSession(connector=connector) as session:
            for local_task in tasks: # tasks is a list of object
                await update_ocr_task()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(aysnc_wrapper())

I think the for loop is sync and will totally block the progress, am I right? If I am, how to construct it?

Sraw
  • 18,892
  • 11
  • 54
  • 87

1 Answers1

4
def run(tasks):
    # ...

    loop = asyncio.get_event_loop()
    loop.run_until_complete(aysnc_wrapper())

Usually it's not how people write async programs: event loop should be global and be started as main entry point of whole script. Doing like you do (running event loop inside single function run) makes unable for upper code to run other coroutines in same event loop.

If you understand this and all you want is blocking run function that can't be used alongside with other async stuff, read further.


Problem with your async_wrapper is that it awaits next update_ocr_task() coroutine only after previous one been done. For loop is not what we call "blocking", but it's just not concurrent - it doesn't use benefits provided by asynchronous paradigm.

To achieve benefits of using asyncio you should run multiple coroutines concurrently. Common way to do it is to use asyncio.gather():

async def async_wrapper():

    async def process_single_task(task):
        resolver = aiohttp.AsyncResolver()
        connector = aiohttp.TCPConnector(resolver=resolver, family=socket.AF_INET)
        async with aiohttp.ClientSession(connector=connector) as session:
            await session.get(...) # do all job for *single* local task here.

    # Run multiple task processing coroutines concurrently:
    await asyncio.gather(
        *[process_single_task(t) for t in tasks]
    )

If you want you can also read this little answer about asyncio in general.

Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • Oh, I think I have noticed what's wrong. `await update_ocr_task()` is concurrent with other coroutines such as `await task_manager.async_get_task_status(session, task)`, but not itself. `gather` makes all `update_ocr_task` concurrent. That is the point. BTW, the reason why I wrap the whole loop into a function is that I want to run async event loop in a separate thread/process as my main thread is a sync task. – Sraw Dec 08 '17 at 14:26
  • A little additional question, could I just replace `for` with `async for` to accomplish the same behave? – Sraw Dec 08 '17 at 17:07
  • @Sraw no. `async for` uses to iterate by asynchronous generator. It's absolutely different thing. – Mikhail Gerasimov Dec 08 '17 at 18:09
  • Thanks for answer, I have understood coroutine much more. – Sraw Dec 09 '17 at 04:44