0

I have a program which makes async queries to a graphql system, then writes those results to file. I'm calling the main function with loop.run_until_complete.

       try:
           loop.run_until_complete(run_suite(gql_client))
           log.info('Done.')
       except asyncio.CancelledError:
           done

Later in run_suite, I write the results of the various queries to file (in run_query). The function that writes the files is blocking (write_query_result), so I call it with run_in_executor.

def write_query_result(folder: str, filename: str, result: Any):
    dirname = '%s/%s' % (folder, filename)
    log.info('Writing [%s]' % dirname)
    writer.write_data(dirname, result)
    return result

async def run_query(query: List[Dict[str, Any]]) -> Union[asyncio.Future, List[asyncio.Future], Exception]:
    _loop = asyncio.get_event_loop()
    try:
        result = await asyncio.create_task(query['func']())
        writers = []
        for fname in result.keys():
            result = await _loop.run_in_executor(None, write_query_result, query['folder'], fname+query['file_extension'], result)
            writers.append(result)
        return writers
    except Exception as e:
        log.error('Error getting results. Query [%s]. Error [%s]' % (
            query['name'], e))
        raise e

async def run_queries(session: aiohttp.ClientSession, client_id: str, host: str = '127.0.0.1') -> asyncio.Future:
...
    tasks = []
    for query_set in queries:
        for query in query_set:
            tasks.append(asyncio.create_task(run_query(query)))

    return await asyncio.gather(*tasks)

With this code my queries run asyncronously. The files get written asyncronously in the executor, and then when all the files have been written the loop completes. I see it log the 'Done' message and the program exits.

However, I noticed that if I modify the code for writing the results as shown below (gathering the results of all calls to run_in_executor), then my loop actually completes before all the files are written. I see the 'Done' message, and then I still see it printing that its writing to file.

async def run_query(query: List[Dict[str, Any]]) -> Union[asyncio.Future, List[asyncio.Future], Exception]:
    _loop = asyncio.get_event_loop()
    try:
        result = await asyncio.create_task(query['func']())
        writers = []
        for fname in result.keys():
            writers.append(_loop.run_in_executor(None, write_query_result, query['folder'], fname+query['file_extension'], result))
        return asyncio.gather(*writers)
    except Exception as e:
        log.error('Error getting results. Query [%s]. Error [%s]' % (
            query['name'], e))
        raise e

I'm fairly new to asyncio and am not 100% sure what might be going on. I'd appreciate any insight on why this might be happening.

isaacgr
  • 21
  • 1
  • 4
  • 2
    Probably just a forgotten `await` in `asyncio.gather(*writers)`. – VPfB Sep 25 '21 at 15:02
  • @VPfB You were right. Thank you, completely missed that... Why specifically would that have caused this? I guess because it wasn't returning a future so assumed that the function was done completely? – isaacgr Sep 25 '21 at 15:16
  • 1
    A simple call of an async func creates and returns a coroutine without running it. The program immediately continues with the next instruction. – VPfB Sep 25 '21 at 16:38
  • 2
    Please also note that this `result = await asyncio.create_task(afunc())` creates an unnecessary task. Do this instead: `result = await afunc()` . A task is needed only if you want a coroutine to be run "independently" from the rest of the program, see the introduction in this answer https://stackoverflow.com/a/62529343/5378816 – VPfB Sep 25 '21 at 17:15

0 Answers0