34

I just want to know how to clear a multiprocessing.Queue like a queue.Queue in Python:

>>> import queue
>>> queue.Queue().clear()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'Queue' object has no attribute 'clear'
>>> queue.Queue().queue.clear()
>>> import multiprocessing
>>> multiprocessing.Queue().clear()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'Queue' object has no attribute 'clear'
>>> multiprocessing.Queue().queue.clear()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'Queue' object has no attribute 'queue'
Géry Ogam
  • 6,336
  • 4
  • 38
  • 67
FelipeG
  • 445
  • 2
  • 5
  • 6
  • 1
    Yes in the case of normal queue you can clean its contents by normal_q.queue.clear(). However I need to use multiprocessing one. thanks for the reply – FelipeG May 09 '13 at 12:31
  • 5
    This isn't an answer to your underlying question, but I feel compelled to point out that the `import` statements in your question overwrite one another. If you use the code you've written above, both `multi_q` and `normal_q` will be regular `Queue.Queue` instances. To make it work you either need import just the modules and use fully-qualified names for the classes (e.g. `multi_q = multiprocessing.Queue()`) or use the `as` keyword to import them under different names (e.g. `from Queue import Queue as qQueue`). – Blckknght May 09 '13 at 12:55

5 Answers5

70

So, I take look at Queue class, and you may to try this code:

while not some_queue.empty():
    some_queue.get()  # as docs say: Remove and return an item from the queue.
Charles
  • 1,153
  • 1
  • 15
  • 27
Pavel Stárek
  • 959
  • 5
  • 6
  • 18
    `while not some_queue.empty():` is more pythonic – Jonathan Dec 21 '13 at 21:42
  • 4
    This answer should've been picked because it actually provides a solution instead of just saying "there is not direct way of doing that". – exfizik Jan 19 '15 at 18:20
  • 11
    Depending on your implementation, you could have a race condition between the queue not being empty when `.empty()` is called and then it actually being empty when `.get()` is called. If this is the case, since `.get()` is blocking, your program may hang. Use `.get_nowait()` instead to prevent this race condition. – Manuel J. Diaz Feb 20 '17 at 00:45
  • A try catch will be also useful to consider the Empty exception – Filippo Mazza Oct 16 '17 at 13:20
  • 8
    Warning: this will BLOCK if the queue as one last item in it when `not some_queue.empty()` is called and that item is removed before `some_queue.get()` is called. This is ENTIRELY possible in a multithreaded environment. My `ClearableQueue` answer avoids that situation. – Dan H May 22 '18 at 17:19
  • @DanH doesnt get_nowait() handle the block you mentioned? – PirateApp Feb 27 '19 at 09:55
  • 4
    @PirateApp : yes, `get_nowait()` would avoid that problem... but you'd have to catch the `Empty` exception... and, at that point, you've basically implemented my solution, given in https://stackoverflow.com/a/36018632/701435 -- why bother to check `.empty()` if you can just `get_nowait()` until `Empty`? – Dan H Jun 12 '19 at 12:33
  • I actually got non synchronization problems with this approach and I'm using the `try` statement instead. – Daniel Möller Dec 18 '19 at 17:44
20

Ask for forgiveness rather than permission; just try to empty the queue until you get the Empty exception, then ignore that exception:

from Queue import Empty

def clear(q):
    try:
        while True:
            q.get_nowait()
    except Empty:
        pass

Better yet: is a built-in class missing the method you want? Subclass the built-in class, and add the method you think should be there!

from Queue import Queue, Empty

class ClearableQueue(Queue):

    def clear(self):
        try:
            while True:
                self.get_nowait()
        except Empty:
            pass

Your ClearableQueue class inherits all the goodness (and behavior) of the built-in Queue class, and has the method you now want.

Simply use q = ClearableQueue() in all places where you used q = Queue(), and call q.clear() when you'd like.

Dan H
  • 14,044
  • 6
  • 39
  • 32
  • 4
    This code has a bug, because in Python the `get_nowait` function can throw `Empty` even if the queue is full (yes, really). – Geoffrey Irving Aug 15 '17 at 23:35
  • 3
    @GeoffreyIrving: I guess I'd consider that a bug in Python... not my example code! Seriously: are you mentioning a *documented* feature? Or are you just pointing out the race condition that *by the time this method returns, someone may have pushed something else on the queue*. In which case... well, yes, that can happen. Multithreaded processing is like that. But that still isn't a bug in my proposed implementation of `clear()`. – Dan H Aug 17 '17 at 00:08
  • 1
    I found out in practice that I need this approach rather than checking, because multithreading can change things between the `if` and the `get`. – Daniel Möller Dec 18 '19 at 17:46
  • 1
    Well according to the docs (https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues), "multiprocessing uses the usual queue.Empty and queue.Full exceptions to signal a timeout.", so I would say that assuming that the exception `queue.Empty` means that the queue is empty is wrong. – Stan Sep 11 '20 at 14:07
  • @Stan : I really don't expect `get_nowait()` is going to timeout... as, by definition, there is no waiting. – Dan H Sep 26 '20 at 00:51
  • Note that the `Queue` module was renamed to `queue` in python 3. – Irv May 05 '21 at 18:49
  • 1
    Also, if subclassing `multiprocessing.Queue`, that won't work, because `multiprocessing.Queue` is a function, not a class, so you need to subclass `multiprocessing.queues.Queue` . – Irv May 05 '21 at 18:57
  • In our application, we put everything in the queue, and then start processing. We called `JoinableQueue.close()` and then needed to do the `clear()`. Solution was to do `clear()` NOT in the process that called `close()` !! – Irv May 07 '21 at 20:23
  • @Irv Good catch on the subclassing thing. I'll update my answer for Python 3 someday soon. – Dan H May 07 '21 at 21:01
  • Unfortunately I could not get it to work with subclassing multiprocessing.queues.Queue because you need to provide the ctx too, which is what the multiprocessing.Queue function does. – n4321d Feb 14 '22 at 21:10
  • An alternative solution that guarantees that *all* items have been put to the queue and consumed: https://stackoverflow.com/a/72164506/2326961 – Géry Ogam May 08 '22 at 21:55
3

There is no direct way of clearing a multiprocessing.Queue.

I believe the closest you have is close(), but that simply states that no more data will be pushed to that queue, and will close it when all data has been flushed to the pipe.

pcalcao
  • 15,789
  • 1
  • 44
  • 64
  • Thanks for the reply, I have tried to use close() but the problem is that I have a while loop as: while not someQueue.empty(): do something end process So I want to clean the queue from other process thus the while loop ends. but if I close the queue it raise an error – FelipeG May 09 '13 at 12:35
0

pipe(7) Linux manual page specifies that a pipe has a limited capacity (65,536 bytes by default) and that writing to a full pipe blocks until enough data has been read from the pipe to allow the write to complete:

I/O on pipes and FIFOs

[…]

If a process attempts to read from an empty pipe, then read(2) will block until data is available. If a process attempts to write to a full pipe (see below), then write(2) blocks until sufficient data has been read from the pipe to allow the write to complete. Nonblocking I/O is possible by using the fcntl(2) F_SETFL operation to enable the O_NONBLOCK open file status flag.

[…]

Pipe capacity

A pipe has a limited capacity. If the pipe is full, then a write(2) will block or fail, depending on whether the O_NONBLOCK flag is set (see below). Different implementations have different limits for the pipe capacity. Applications should not rely on a particular capacity: an application should be designed so that a reading process consumes data as soon as it is available, so that a writing process does not remain blocked.

In Linux versions before 2.6.11, the capacity of a pipe was the same as the system page size (e.g., 4096 bytes on i386). Since Linux 2.6.11, the pipe capacity is 16 pages (i.e., 65,536 bytes in a system with a page size of 4096 bytes). Since Linux 2.6.35, the default pipe capacity is 16 pages, but the capacity can be queried and set using the fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations. See fcntl(2) for more information.

That is why the multiprocessing Python library documentation recommends to make a consumer process empty each Queue object with Queue.get calls before its feeder threads are joined in producer processes (implicitly with garbage collection or explicitly with Queue.join_thread calls):

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

An example which will deadlock is the following:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

A fix here would be to swap the last two lines (or simply remove the p.join() line).

In some applications, a consumer process may not know how many items have been added to a queue by producer processes. In this situation, a reliable way to empty the queue is to make each producer process add a sentinel item when it is done and make the consumer process remove items (regular and sentinel items) until it has removed as many sentinel items as there are producer processes:

import multiprocessing

def f(q, e):
    while True:
        q.put('X' * 1000000)  # block the feeder thread (size > pipe capacity)
        if e.is_set():
            break
    q.put(None)  # add a sentinel item

if __name__ == '__main__':
    start_count = 5
    stop_count = 0
    q = multiprocessing.Queue()
    e = multiprocessing.Event()
    for _ in range(start_count):
        multiprocessing.Process(target=f, args=(q, e)).start()
    e.set()  # stop producer processes
    while stop_count < start_count:
        if q.get() is None:  # empty the queue
            stop_count += 1  # count the sentinel items removed

This solution uses blocking Queue.get calls to empty the queue. This guarantees that all items have been added to the queue and removed.

@DanH’s solution uses non-blocking Queue.get_nowait calls to empty the queue. The problem with that solution is that producer processes can still add items to the queue after the consumer process has emptied the queue, which will create a deadlock (the consumer process will wait for the producer processes to terminate, each producer process will wait for its feeder thread to terminate, the feeder thread of each producer process will wait for the consumer process to remove the items added to the queue):

import multiprocessing.queues

def f(q):
    q.put('X' * 1000000)  # block the feeder thread (size > pipe capacity)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=f, args=(q,))
    p.start()
    try:
        while True:
            q.get_nowait()
    except multiprocessing.queues.Empty:
        pass  # reached before the producer process adds the item to the queue
    p.join()  # deadlock

Or newly created producer processes can fail to deserialise the Process object of the consumer process if the synchronisation resources of the queue that comes with it as an attribute are garbage collected before, raising a FileNotFoundError:

import multiprocessing.queues

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    multiprocessing.Process(target=f, args=(q,)).start()
    try:
        while True:
            q.get_nowait()
    except multiprocessing.queues.Empty:
        pass  # reached before the producer process deserialises the Process

Standard error:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
  File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
Géry Ogam
  • 6,336
  • 4
  • 38
  • 67
-1

I am a newbie so don't be angry on me, but

Why not redefine the .Queue() variable?

import multiprocessing as mp

q = mp.Queue()
chunk = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

for i in chunk:
    q.put(i)
print(q.empty())

q = mp.Queue()
print(q.empty())

My output:

>>False
>>True

I'm just self-educating right now, so if I'm wrong, feel free to point it out

  • 2
    Here's the reason that won't work: 1) If you are multiprocessing, that implies that one (or more) processes are adding stuff to the queue and one (or more) processes are removing stuff from the queue. 2) Your code DID NOT clear the first instance of Queue. Rather, in the process that executed your code, you simply stopped "paying attention" to the first Queue, created a new one, and assigned it to the same variable. 3) Thus, any process which had a pointer to the first Queue still sees it as non-empty. – Dan H May 06 '21 at 20:40