12

Errors in my producer/consumer coroutines may leave items and unfinished tasks in the asyncio queue. If an error is encountered, I want to simply stop the loop, cancel the pending tasks and clear the queue. While I can finish the first two things, I cannot find an easy way to clear the queue. After reading this answer, I come up with three methods:

import asyncio

q=asyncio.Queue()
for i in range(5):
    q.put_nowait(i)
q.get_nowait()

loop=asyncio.get_event_loop()

#this will raise an error if q cannot join
loop.run_until_complete(asyncio.wait_for(q.join(),1))

#method 1
q._queue.clear()
q._finished.set()
q._unfinished_tasks = 0

#method 2
for _ in range(q.qsize()):
    q.get_nowait()
for _ in range(q._unfinished_tasks):
    q.task_done()

#method 3
del q
q=asyncio.Queue()

So which one is better?

lovetl2002
  • 910
  • 9
  • 23

2 Answers2

6

Avoid using the "private" methods. From the documentation of Queue.task_done:

For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.


def empty_queue(q: asyncio.Queue):
  while not q.empty():
    # Depending on your program, you may want to
    # catch QueueEmpty
    q.get_nowait()
    q.task_done()

coxley
  • 339
  • 2
  • 12
-3

I did this in various DBussy example scripts:

for task in asyncio.Task.all_tasks(loop) :
    task.cancel()
    try :
        loop.run_until_complete(task)
    except asyncio.CancelledError :
        pass
    #end try
#end for
Lawrence D'Oliveiro
  • 2,768
  • 1
  • 15
  • 13