I am currently building a project that requires multiple requests made to various endpoints. I am wrapping these requests in Aiohttp to allow for async.
The problem:
I have three Queues: queue1
, queue2
, and queue3
. Additionally, I have three worker functions (worker1
, worker2
, worker3
) that are associated with their respective Queue. The first queue is populated immediately with a list IDs that is known prior to running. When the request is finished and the data is committed to a database, it passes the ID to queue2
. A worker2
will take this ID and request more data. From this data it will begin to generate a list of IDs (different from the IDs in queue1/queue2
. worker2
will put the IDs in queue3
. Finally worker3
will grab this ID from queue3
and request more data before committing to a database.
The issue arises with the fact queue.join()
is a blocking call. Each worker is tied to a separate Queue so the join for queue1
will block until its finished. This is fine, but it also defeats the purpose of using async. Without using join()
the program is unable to detect when the Queues are totally empty. The other issue is that there may be silent errors when one of the Queues is empty but there is still data that hasn't been added yet.
The basic code outline is as follows:
queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()
async with aiohttp.ClientSession() as session:
for i in range(3):
tasks.append(asyncio.create_task(worker1(queue1)))
for i in range(3):
tasks.append(asyncio.create_task(worker2(queue2)))
for i in range(10):
tasks.append(asyncio.create_task(worker3(queue3)))
for i in IDs:
queue1.put_nowait(i)
await asyncio.gather(*tasks)
The worker functions sit in an infinite loop waiting for items to enter the queue.
When the data has all been processed there will be no exit and the program will hang.
Is there a way to effectively manage the workers and end properly?