0

I have the following code running in an event loop where I'm downloading a large number of files using asyncio and restricting the number of files downloaded using asyncio.queue:

download_tasks = asyncio.Queue()
for file in files:
    # download_file() is an async function that downloads a file from Microsoft blob storage
    # that is basically await blob.download_blob()
    download_tasks.put_nowait(asyncio.create_task(download_file(file=file))

async def worker():
    while not download_tasks.empty():
        return await download_tasks.get_nowait() 

worker_limit = 10
# each call to download_file() returns a pandas dataframe
df_list = await asyncio.gather(*[worker() for _ in range(worker_limit)], return_exceptions=True)
df = pd.concat(df_list)  

This code seems to run fine, but I originally had the for loop defined as:

for file in files:
    # download_file() is an async function that downloads a file from Microsoft blob storage
    # that is basically await blob.download_blob()
    download_tasks.put_nowait(download_file(file=file)

With this code, the result is the same but I get the following warning:

RuntimeWarning: coroutine 'download_file' was never awaited

Looking at asyncio examples, sometimes I see create_task() used when creating a list or queue of coroutines to be run in gather and sometimes I don't. Why is it needed in my case and what's the best practice for using it?

Edit: As @user2357112supportsMonica discourteously pointed out, the return statement within worker() doesn't really make sense. The point of this code is to limit concurrency because I may have to download thousands at a time and would like to limit it to 10 at a time using the queue. So my actual question is, how can I use gather to return all my results using this queue implementation?

Edit 2: I seemed to have found an easy solution that works using a semaphore instead of a queue with the following code adapted from this answer https://stackoverflow.com/a/61478547/4844593:

download_tasks = []
for file in files:
    download_tasks.append(download_file(file=file))

async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)
    async def sem_task(task):
        async with semaphore:
            return await task
    return await asyncio.gather(*(sem_task(task) for task in tasks))
    
df_list = await gather_with_concurrency(10, *download_tasks)
return pd.concat(df_list)  
ddx
  • 469
  • 2
  • 9
  • 27
  • Are you aware that a function can only `return` once? A function ends when it returns. Returning in a loop doesn't make sense. – user2357112 Oct 13 '21 at 07:18
  • The whole `worker_limit` thing doesn't make sense either. asyncio coroutines do not run in separate worker threads. They share a single thread. – user2357112 Oct 13 '21 at 07:20
  • You're launching every downloader coroutine immediately with `create_task`, then your attempt to limit concurrent downloads with the `worker` thing fails in a bunch of ways and just causes you to lose all but 10 dataframes. – user2357112 Oct 13 '21 at 07:24
  • 1
    @user2357112supportsMonica I would expect the goal is to limit the concurrency on the fetching e.g. avoid fetching 1000 files at the same time and possibly explode the remote server or trigger some sort of DOS-protection. Of course this is defeated by creating tasks. – Masklinn Oct 13 '21 at 07:25
  • Thank you @Masklinn that's exactly what the goal is, to limit concurrency. Similar to this answer: https://stackoverflow.com/a/62404509/4844593 – ddx Oct 13 '21 at 07:39
  • I have very similar code for archiving files instead of downloading files where the `return` is omitted because I don't need the results. In this example, I was attempting to limit concurrency and retrieve the results of the downloads in a list. So my question is really, how can I gather the results from asyncio.Queue? – ddx Oct 13 '21 at 07:40
  • 1
    @ddx the easiest by far is to have both an input queue and an output queue, with each worker getting a task from the input queue, putting the result on the output queue, and calling `task_done` on the input queue. In a loop. Then the "gatherer" can either get the contents of the output queue as it comes in, or `join()` on the input queue to wait for the input to get entirely processed and then get the entire thing from the output queue at once. – Masklinn Oct 13 '21 at 07:46
  • 1
    Alternatively you could have each worker buffer its results, and return everything at once when the input queue is empty. Either way you can't just slap a `yield` in the workers because async iterators are not coroutines, you can't just pass one to `gather` or `wait`. Or if you're fine with third-party packages you could have all your workers be asynchronous generators, and interleave their outputs using [`aiostream.stream.merge`](https://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.merge). – Masklinn Oct 13 '21 at 07:51
  • @Masklinn, I've found an easier solution using a sempahore instead of a queue (see Edit 2 above). The only thing I'm worried about now is that this answer says queues might have advanatages over semaphores in certain situations: https://stackoverflow.com/a/48484593/4844593 I'm not sure if this situation will apply to me or not. I'll be working with a few thousand files maximum at a time, meaning my work will never be unbounded and creating a few thousand coroutines in advance might be fine? – ddx Oct 13 '21 at 08:24

1 Answers1

2

As "user2357112 supports Monica" notes, the original issue probably comes from the workers having a return so each worker will download one file then quit, meaning any coroutines after the first 10 will be ignored and never awaited (you can probably see that if you log information about download_tasks after the supposed completion of your processing).

The create_tasks defeats that because it will immediately schedule the downloading at the same time (defeating the attempted rate limiting / workers pool), then the incorrect worker code will just ignore anything after the first 10 items.

Anyway the difference between coroutines (e.g. bare async functions) and tasks is that tasks are independently scheduled. That is, once you've created a task it lives its life independently and you don't have to await it if you don't want its result. That is similar to Javascript's async functions.

coroutines, however, don't do anything until they are awaited, they will only progress if they are explicitelly polled and that is only done by awaiting them (directly or indirectly e.g. gather or wait will await/poll the objects they wrap).

Masklinn
  • 34,759
  • 3
  • 38
  • 57