I have the following code running in an event loop where I'm downloading a large number of files using asyncio
and restricting the number of files downloaded using asyncio.queue
:
download_tasks = asyncio.Queue()
for file in files:
# download_file() is an async function that downloads a file from Microsoft blob storage
# that is basically await blob.download_blob()
download_tasks.put_nowait(asyncio.create_task(download_file(file=file))
async def worker():
while not download_tasks.empty():
return await download_tasks.get_nowait()
worker_limit = 10
# each call to download_file() returns a pandas dataframe
df_list = await asyncio.gather(*[worker() for _ in range(worker_limit)], return_exceptions=True)
df = pd.concat(df_list)
This code seems to run fine, but I originally had the for loop defined as:
for file in files:
# download_file() is an async function that downloads a file from Microsoft blob storage
# that is basically await blob.download_blob()
download_tasks.put_nowait(download_file(file=file)
With this code, the result is the same but I get the following warning:
RuntimeWarning: coroutine 'download_file' was never awaited
Looking at asyncio examples, sometimes I see create_task()
used when creating a list or queue of coroutines to be run in gather and sometimes I don't. Why is it needed in my case and what's the best practice for using it?
Edit: As @user2357112supportsMonica discourteously pointed out, the return
statement within worker()
doesn't really make sense. The point of this code is to limit concurrency because I may have to download thousands at a time and would like to limit it to 10 at a time using the queue. So my actual question is, how can I use gather to return all my results using this queue implementation?
Edit 2: I seemed to have found an easy solution that works using a semaphore instead of a queue with the following code adapted from this answer https://stackoverflow.com/a/61478547/4844593:
download_tasks = []
for file in files:
download_tasks.append(download_file(file=file))
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
df_list = await gather_with_concurrency(10, *download_tasks)
return pd.concat(df_list)