69

I have a simple task like that:

def worker(queue):
    while True:
        try:
            _ = queue.get_nowait()
        except Queue.Empty:
            break

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    # queue = multiprocessing.Queue()
    queue = manager.Queue()

    for i in range(5):
        queue.put(i)

    processes = []

    for i in range(2):
        proc = multiprocessing.Process(target=worker, args=(queue,))
        processes.append(proc)
        proc.start()

    for proc in processes:
        proc.join()

It seems that multiprocessing.Queue can do all work that i needed, but on the other hand I see many examples of manager().Queue() and can't understand what I really need. Looks like Manager().Queue() use some sort of proxy objects, but I doesn't understand those purpose, because multiprocessing.Queue() do the same work without any proxy objects.

So, my questions is:

1) What really difference between multiprocessing.Queue and object returned by multiprocessing.manager().Queue()?

2) What do I need to use?

Vadim Kotov
  • 8,084
  • 8
  • 48
  • 62
novicef
  • 803
  • 1
  • 8
  • 9

2 Answers2

57

Though my understanding is limited about this subject, from what I did I can tell there is one main difference between multiprocessing.Queue() and multiprocessing.Manager().Queue():

  • multiprocessing.Queue() is an object whereas multiprocessing.Manager().Queue() is an address (proxy) pointing to shared queue managed by the multiprocessing.Manager() object.
  • therefore you can't pass normal multiprocessing.Queue() objects to Pool methods, because it can't be pickled.
  • Moreover the python doc tells us to pay particular attention when using multiprocessing.Queue() because it can have undesired effects

Note When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager. After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising Queue.Empty. If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. Note that a queue created using a manager does not have this issue.

There is a workaround to use multiprocessing.Queue() with Pool by setting the queue as a global variable and setting it for all processes at initialization :

queue = multiprocessing.Queue()
def initialize_shared(q):
    global queue
    queue=q

pool= Pool(nb_process,initializer=initialize_shared, initargs(queue,))

will create pool processes with correctly shared queues but we can argue that the multiprocessing.Queue() objects were not created for this use.

On the other hand the manager.Queue() can be shared between pool subprocesses by passing it as normal argument of a function.

In my opinion, using multiprocessing.Manager().Queue() is fine in every case and less troublesome. There might be some drawbacks using a manager but I'm not aware of it.

michael
  • 685
  • 7
  • 6
  • 10
    From what I have been able to glean, the downside for managed items (queues, values, etc.) is that they are slower. Regular multiprocess items are shared, so access is quick, although needs to be protected by a lock. To the best of my knowledge, the multiprocess.manager equivalents are proxies to the actual item handled in its own process. So while the manager protects agains race conditions, it also means a lot of inter-process calling, which is expensive. So if there will be a lot of communication with the shared item, unmanaged may be faster (although more dangerous). – Avraham Jun 05 '19 at 21:19
  • With the workaround mentioned, the "queue" will be initialised as a new queue object for each process. To have the same queue for all process, use `multiprocessing.Manager().Queue()`. `global queue` = global only for the current process – trogne Sep 24 '20 at 16:04
  • 1
    @Avraham from my limited practical experience, it's quite the other way around -- the unmanaged queues were slow to the point of being prohibitive, when I was developing a multiprocess video processing software. Using a managed Queue solved all the issues. Relevant: https://stackoverflow.com/questions/47085458/ – Vinícius M Jan 09 '21 at 19:39
  • @trogne the workaround probably is limited to UNIX systems due to how process forking happens in those systems, where parent memory is shared with childrens by the kernel. – RomuloPBenedetti Jan 28 '22 at 10:16
6

I have recently came over a problem with Manager().Queue(), when the SyncManager object - returned by multiprocessing.Manager() - seemingly dies, and the queues it manages block forever (even with *_nowait()).

I am not sure of the reason, or if the SyncManager really dies, the only clue I have is that I call multiprocessing.Manager() from a class instance, which has __del__(), which logs the process it is called from, and I can see this being __del__() called from the SyncManager process.

This means that my object has a copy in the SyncManager process, and it is garbage collected. This could mean that only my object was deleted, and the SyncManager is fine, but I do see that the corresponding queues becoming unresponsive correlate to the __del__() call in the SyncManager process.

I have no idea, how my object ends up in the SyncManager process. I usually pump out 50-200 managers - some with overlapping lifetimes, others not - until I see this problem. For objects that exist when the interpreter exits, __del__() is not called, and I usually not see the SyncManager objects dying by this log from __del__(), only on occasion. Probably when there is a problem, the SyncManager object first disposes of its objects, and only then will the interpreter exit, and this is Why I see the __del__() call on occasion.

I did see my queue become unresponsive even in cases, where I did not see the __del__() being called from the SyncManager.

I have also seen the SyncManager "die" without causing further problems.

By "unresponsive" I mean:

queue.get(timeout=1)
queue.put(timeout=1)

never return.

queue.get_nowait(timeout=1)
queue.put_nowait(timeout=1)

never return.

This became a bit more involved, then I originally wanted, but I let the details in, just in case it helps someone.

I used Manager().Queue() for a long time before without any problems. I suspect that either instantiating a lot of manager objects caused the problem, or instantiating a lot of managers led to a problem that has always existed surface.

I use Python 3.6.5.

Zoltan K.
  • 1,036
  • 9
  • 19
  • 2
    I'm still learning all the intricacies of multiprocessing so I'm not 100% sure, but the [issues related to `__del__` could be similar to those explained here](https://codewithoutrules.com/2017/08/16/concurrency-python/). Supposedly they fixed some of this in `3.7` according to that post. – TrinitronX Aug 02 '20 at 10:44