0

Description

  • There are three tasks that depend on each other.
  • It's communicate through queues.

Question:

  • How can I improve the following code to achieve asynchronous effect?
  • If I run the following code directly, will quote "asyncio.queues.QueueEmpty" error , why?

Here's my code:

#!/usr/bin/env python 3.9
# -*- coding: utf-8 -*-

import asyncio

def worker_0(data:list,queue_1:asyncio.Queue):
    for dat in data:
        k,v = dat['key'],dat['value']
        queue_1.put_nowait({"key":k,"value":"q1_"+str(v)})
        print(f"work_0 done!  Production:  'key':{k}, 'value':{'q1_'+str(v)}     \tq1_size: {queue_1.qsize()}")

async  def worker_1(queue_1:asyncio.Queue,queue_2:asyncio.Queue):
    while True:
        data = queue_1.get_nowait()
        k,v = data['key'],data['value']
        await asyncio.sleep(1.5)
        queue_2.put_nowait({"key":k,"value":"q2_"+str(v)})
        await queue_2.join()
        queue_1.task_done()
        print(f"work_1 done!  Consumption:'key': {k}, 'value': {v}               \tq1_size: {queue_1.qsize()}")
        print(f"work_1 done!  Production: 'key': {k}, 'value': {'q2_' + str(v)}  \tq2_size: {queue_2.qsize()}")
        if queue_1.empty():
            break

async  def worker_2(queue_2:asyncio.Queue,queue_3:asyncio.Queue):
    while True:
        data = queue_2.get_nowait()
        k, v = data['key'], data['value']
        await asyncio.sleep(2)
        queue_3.put_nowait({"key": k, "value": "q3_" + str(v)})
        await queue_3.join()
        queue_2.task_done()

        print(f"work_2 done!  Consumption:'key': {k}, 'value':  {v}              \tq2_size: {queue_2.qsize()}")
        print(f"work_2 done!  Production: 'key': {k}, 'value':  {'q3_' + str(v)} \tq3_size: {queue_3.qsize()}")
        if queue_2.empty():
            break

async def worker_3(queue_3:asyncio.Queue):
    while True:
        data = queue_3.get_nowait()
        k, v = data['key'], data['value']
        await  asyncio.sleep(2.5)
        result = str(k) + str(v)
        print(f"result:{result}")
        print(f"work_3 done!  Consumption:'key': {k}, 'value': {v}               \tq3_size: {queue_3.qsize()}")
        queue_3.task_done()
        if queue_3.empty():
            break

async  def main():
    queue_1 = asyncio.Queue()
    queue_2 = asyncio.Queue()
    queue_3 = asyncio.Queue()
    worker_0(data,queue_1)
    tasks_1 = [asyncio.create_task(worker_1(queue_1,queue_2)) for _ in range(10)]
    tasks_2 = [asyncio.create_task(worker_2(queue_2,queue_3)) for _ in range(10)]
    tasks_3 = [asyncio.create_task(worker_3(queue_3)) for _ in range(10)]

    await asyncio.gather(*[tasks_1+tasks_2+tasks_3])


if __name__ == '__main__':
    data = [{"key":i,"value":v} for i,v in enumerate(range(100)) ]
    asyncio.run(main())
CG Zhang
  • 186
  • 1
  • 6
  • You are using ```get_nowait``` and, if fast enough (likely) the workers will check the queues before there is anything there to get. Better to await a get. – jwal Oct 12 '21 at 16:41
  • Thanks a lot for your suggestion, I used the 'await queue.get()' to make some improvements to solve the performance of the program, but I am not sure if this is thread safe,I've posted the code below. – CG Zhang Oct 22 '21 at 08:20
  • Good job. asyncio queues are not thread safe so your use of them won't alter this. asyncio is concurrency not parallelism see https://stackoverflow.com/a/1050257/6242321 (applies to threading in python as well however that is a different thing). – jwal Oct 22 '21 at 16:37

1 Answers1

0

I used the 'await queue.get()' to make some improvements to solve the performance of the program.

import asyncio
import datetime
import random

def worker_0(data:list,queue_1:asyncio.Queue):
    for dat in data:
        k,v = dat['key'],dat['value']
        queue_1.put_nowait({"key":k,"value":"q1_"+str(v)})
        print(f"q1_size: {queue_1.qsize()}\t\twork_0 produced:'key':{k}, 'value':{'q1_'+str(v)}")


async  def worker_1(queue_1:asyncio.Queue,queue_2:asyncio.Queue,queue_3:asyncio.Queue):
    while True:
        data = await queue_1.get()
        # data = queue_1.get_nowait()
        k,v = data['key'],data['value']
        await asyncio.sleep(round(random.uniform(1,2),1))
        # do something
        queue_2.put_nowait({"key":k,"value":"q2_"+str(v)})
        print(f"q2_size: {queue_2.qsize()}\t\twork_1 produced:'key':{k},'value':{'q2_' + str(v)}")
        if queue_1.empty():
            break
        queue_1.task_done()


async  def worker_2(queue_2:asyncio.Queue,queue_3:asyncio.Queue):
    while True:
        # data = queue_2.get_nowait()
        data = await queue_2.get()
        k, v = data['key'], data['value']
        await asyncio.sleep(round(random.uniform(1,2),1))
        # do something
        queue_3.put_nowait({"key": k, "value": "q3_" + str(v)})
        print(f"q3_size: {queue_3.qsize()}\t\twork_2 produced:'key':{k},'value':{'q3_' + str(v)}")
        if queue_2.empty():
            break
        queue_2.task_done()


async def worker_3(queue_3:asyncio.Queue):
    while True:
        data = await queue_3.get()
        # data = queue_3.get_nowait()
        k, v = data['key'], data['value']
        await  asyncio.sleep(round(random.uniform(1,2),1))
        # do something
        # print(f"{datetime.datetime.now()} result:{str(k)}_{str(v)}")
        print(f"q3_size: {queue_3.qsize()}\t\twork_3 consumed:'key':{k},'value':{v}")
        if queue_3.empty():
            break
        queue_3.task_done()


async  def main():
    queue_1 = asyncio.Queue()
    queue_2 = asyncio.Queue()
    queue_3 = asyncio.Queue()
    worker_0(data,queue_1)
    tasks_1 = [asyncio.create_task(worker_1(queue_1,queue_2,queue_3)) for _ in range(10)]
    tasks_2 = [asyncio.create_task(worker_2(queue_2, queue_3)) for _ in range(10)]
    tasks_3 = [asyncio.create_task(worker_3(queue_3)) for _ in range(10)]
    await asyncio.gather(*tasks_1)
    await asyncio.gather(*tasks_2)
    await asyncio.gather(*tasks_3)


if __name__ == '__main__':
    data = [{"key":i,"value":v} for i,v in enumerate(range(100)) ]
    s = datetime.datetime.now()
    asyncio.run(main())
    print(f"Total spend time: {datetime.datetime.now()-s}")
CG Zhang
  • 186
  • 1
  • 6