I have a program which makes async queries to a graphql system, then writes those results to file. I'm calling the main function with loop.run_until_complete
.
try:
loop.run_until_complete(run_suite(gql_client))
log.info('Done.')
except asyncio.CancelledError:
done
Later in run_suite
, I write the results of the various queries to file (in run_query
). The function that writes the files is blocking (write_query_result
), so I call it with run_in_executor
.
def write_query_result(folder: str, filename: str, result: Any):
dirname = '%s/%s' % (folder, filename)
log.info('Writing [%s]' % dirname)
writer.write_data(dirname, result)
return result
async def run_query(query: List[Dict[str, Any]]) -> Union[asyncio.Future, List[asyncio.Future], Exception]:
_loop = asyncio.get_event_loop()
try:
result = await asyncio.create_task(query['func']())
writers = []
for fname in result.keys():
result = await _loop.run_in_executor(None, write_query_result, query['folder'], fname+query['file_extension'], result)
writers.append(result)
return writers
except Exception as e:
log.error('Error getting results. Query [%s]. Error [%s]' % (
query['name'], e))
raise e
async def run_queries(session: aiohttp.ClientSession, client_id: str, host: str = '127.0.0.1') -> asyncio.Future:
...
tasks = []
for query_set in queries:
for query in query_set:
tasks.append(asyncio.create_task(run_query(query)))
return await asyncio.gather(*tasks)
With this code my queries run asyncronously. The files get written asyncronously in the executor, and then when all the files have been written the loop completes. I see it log the 'Done' message and the program exits.
However, I noticed that if I modify the code for writing the results as shown below (gathering the results of all calls to run_in_executor
), then my loop actually completes before all the files are written. I see the 'Done' message, and then I still see it printing that its writing to file.
async def run_query(query: List[Dict[str, Any]]) -> Union[asyncio.Future, List[asyncio.Future], Exception]:
_loop = asyncio.get_event_loop()
try:
result = await asyncio.create_task(query['func']())
writers = []
for fname in result.keys():
writers.append(_loop.run_in_executor(None, write_query_result, query['folder'], fname+query['file_extension'], result))
return asyncio.gather(*writers)
except Exception as e:
log.error('Error getting results. Query [%s]. Error [%s]' % (
query['name'], e))
raise e
I'm fairly new to asyncio and am not 100% sure what might be going on. I'd appreciate any insight on why this might be happening.