1

Problem: I'd like to use aiohttp producers to download large amount of files and aiofiles consumers pull files from the pool.

Here's my draft structure, any thoughts please?

async def producer() #downloader
    async with session.get(url) as r:
    #...something
    await queue.put(r.read())

async def consumer() #writer
    await queue.get()
    #job for async aiofiles
    queue.task_done()

async def main()
    queue = asyncio.Queue()

    async with aiohttp.ClientSession() as session:
       for url in urls:
         #...
         asyncio.create_task(producer())
         #append task to task list
     
    for n in range(number_of_consumers):
        asyncio.create_task(consumer())
        #append task to task list

    await queue.join() 
    await asyncio.gather(*tasks, return_exceptions=True) 

Round
  • 69
  • 7
  • Could you clarify your problem? The point of the example is that it puts items into a queue, which the consumer later removes in another task. If you use aiohttp to put items in a queue instead of a for loop, isn't that what you want? – Paul Cornelius Feb 18 '23 at 20:13
  • Hi Paul thanks for replay. Yes but then where I should put aiofiles as consumer especially when number of consumers may be much less than producers. If I put async file writers after session.get I could save response directly then i don’t need a pool right? But I couldn’t limit how many consumers in this way. – Round Feb 19 '23 at 00:33
  • Sorry, I'm just not understanding the problem. Where a consumer should put the files seems to have nothing to do with how many consumers there are. You already show code that limits the number of consumers, so I don't know why you think this is a problem. I don't know what "pool" you are talking about, since it doesn't seem like you're referring to either type of Pool in the standard Python library (ProcessPool or ThreadPool). – Paul Cornelius Feb 19 '23 at 02:11
  • Hi @PaulCornelius really appreciate your following up. Something came into my mind, I removed previous samples and quickly outlined my own concept in the post. I'm gonna give a go. The other thing I am still not sure is to control the pool size, can producer wait if pool is full? But I can sort it out later if the main structure works. – Round Feb 19 '23 at 10:03
  • You can control the size of a Queue when you construct it; see the docs for asycnio.Queue. – Paul Cornelius Feb 19 '23 at 21:41
  • @PaulCornelius Hi I find my await queue.get() get stuck if the queue become full, I don't find much info about this topic, do you know anything? – Round Feb 19 '23 at 22:57
  • That doesn't sound right at all. You shouldn't get stuck when you *get* an item from a queue that's full. You would get stuck, of course, when you try to *put* an item onto a queue that's full. Unless you show us the actual code that gets stuck, I don't think there is any chance you will get any help. Perhaps you should ask another question about it. – Paul Cornelius Feb 20 '23 at 13:05
  • @PaulCornelius yes I think I messed up the order to start producer and consumer coroutines finally I got it working as expected, although I still can't get that why at the end consumers need to be canceled and actually only producer need to explicitly started. Thank you. – Round Feb 24 '23 at 11:40
  • I'm glad you got it working. But once again your comment is confusing. Surely you're not saying that the consumers start all by themselves. In your code you explicitly start both producers and consumers, and you don't cancel anything. – Paul Cornelius Feb 25 '23 at 01:14
  • @PaulCornelius Sorry I just didn't update the code in the post. I gradually got the idea it's like in game programming everything takes place (without blocking) in big loop while (!end_of_game){} – Round Mar 03 '23 at 02:03

0 Answers0