Description
- There are three tasks that depend on each other.
- It's communicate through queues.
Question:
- How can I improve the following code to achieve asynchronous effect?
- If I run the following code directly, will quote "asyncio.queues.QueueEmpty" error , why?
Here's my code:
#!/usr/bin/env python 3.9
# -*- coding: utf-8 -*-
import asyncio
def worker_0(data:list,queue_1:asyncio.Queue):
for dat in data:
k,v = dat['key'],dat['value']
queue_1.put_nowait({"key":k,"value":"q1_"+str(v)})
print(f"work_0 done! Production: 'key':{k}, 'value':{'q1_'+str(v)} \tq1_size: {queue_1.qsize()}")
async def worker_1(queue_1:asyncio.Queue,queue_2:asyncio.Queue):
while True:
data = queue_1.get_nowait()
k,v = data['key'],data['value']
await asyncio.sleep(1.5)
queue_2.put_nowait({"key":k,"value":"q2_"+str(v)})
await queue_2.join()
queue_1.task_done()
print(f"work_1 done! Consumption:'key': {k}, 'value': {v} \tq1_size: {queue_1.qsize()}")
print(f"work_1 done! Production: 'key': {k}, 'value': {'q2_' + str(v)} \tq2_size: {queue_2.qsize()}")
if queue_1.empty():
break
async def worker_2(queue_2:asyncio.Queue,queue_3:asyncio.Queue):
while True:
data = queue_2.get_nowait()
k, v = data['key'], data['value']
await asyncio.sleep(2)
queue_3.put_nowait({"key": k, "value": "q3_" + str(v)})
await queue_3.join()
queue_2.task_done()
print(f"work_2 done! Consumption:'key': {k}, 'value': {v} \tq2_size: {queue_2.qsize()}")
print(f"work_2 done! Production: 'key': {k}, 'value': {'q3_' + str(v)} \tq3_size: {queue_3.qsize()}")
if queue_2.empty():
break
async def worker_3(queue_3:asyncio.Queue):
while True:
data = queue_3.get_nowait()
k, v = data['key'], data['value']
await asyncio.sleep(2.5)
result = str(k) + str(v)
print(f"result:{result}")
print(f"work_3 done! Consumption:'key': {k}, 'value': {v} \tq3_size: {queue_3.qsize()}")
queue_3.task_done()
if queue_3.empty():
break
async def main():
queue_1 = asyncio.Queue()
queue_2 = asyncio.Queue()
queue_3 = asyncio.Queue()
worker_0(data,queue_1)
tasks_1 = [asyncio.create_task(worker_1(queue_1,queue_2)) for _ in range(10)]
tasks_2 = [asyncio.create_task(worker_2(queue_2,queue_3)) for _ in range(10)]
tasks_3 = [asyncio.create_task(worker_3(queue_3)) for _ in range(10)]
await asyncio.gather(*[tasks_1+tasks_2+tasks_3])
if __name__ == '__main__':
data = [{"key":i,"value":v} for i,v in enumerate(range(100)) ]
asyncio.run(main())