1

I am planning to have an asyncio Queue based producer-consumer implementation for a processing of realtime data where sending out data in correct time order is vital. So here is the code snippet of it :

async def produce(Q, n_jobs):
    for i in range(n_jobs):
        
        print(f"Producing :{i}")
        await Q.put(i)


async def consume(Q):
    while True:
        n = await Q.get()
        
        print(f"Consumed :{n}")
       
       x = do_sometask_and_return_the_result(n)
       print(f"Finished :{n} and Result: {x}")


async def main(loop):
    Q = asyncio.Queue(loop=loop, maxsize=3)
    await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
    print("Done")

Here the producer produces data and puts it into the asyncio Queue. I have multiple consumers to consume and process the data. While seeing the outputs, the order is maintained while printing "Consumed :{n}" (as in 1,2,3,4... and so on) , this is completely fine. but, since the function do_sometask_and_return_the_result(n) takes variable time to return the result, the order is not maintained in the next print of n "Finished :{n}" (as in 2,1,4,3,5,...).

Is there any way to synchronize this data as I need to maintain the order of printing the results? I want to see 1,2,3,4,.. sequential prints for 'n' even after do_sometask_and_return_the_result(n).

Madhu Soodhan
  • 160
  • 1
  • 10

1 Answers1

2

You could use a priority queue system (using the python heapq library) to reorder your jobs after they are complete. Something like this.

# add these variables at class/global scope
priority_queue = []
current_job_id = 1
job_id_dict = {}

async def produce(Q, n_jobs):
    # same as above

async def consume(Q):
    while True:
        n = await Q.get()
        
        print(f"Consumed :{n}")
       
       x = do_sometask_and_return_the_result(n)
       await process_result(n, x)


async def process_result(n, x):
    heappush(priority_queue, n)
    job_id_dict[n] = x
    while current_job_id == priority_queue[0]:
        job_id = heappop(priority_queue)
        print(f"Finished :{job_id} and Result: {job_id_dict[job_id]}")
        current_job_id += 1
     


async def main(loop):
    Q = asyncio.Queue(loop=loop, maxsize=3)
    await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
    print("Done")

For more information on the heapq module: https://docs.python.org/3/library/heapq.html

Dharman
  • 30,962
  • 25
  • 85
  • 135
Andrew-Harelson
  • 1,022
  • 3
  • 11
  • Thank you! great suggestion. Needed some tweaks, but it worked :) – Madhu Soodhan Oct 05 '21 at 04:50
  • Hi, interested in the solution, could the 'process_result' be the writing of data to the same file for all consumers? would the consumers wait that current consumer has finished writing its chunk before another (in correct order) writes its own chunk? – pierre_j Oct 05 '21 at 13:50
  • Likely more detailed, I presented my own questions on a similar need here: https://stackoverflow.com/questions/69450718/asyncio-can-a-task-only-start-when-previous-task-reach-a-pre-defined-stage – pierre_j Oct 05 '21 at 13:50