16

As you know from the title, I'm trying to use PriorityQueue with multiprocessing. More precisely, I wanted to make shared PriorityQueue, wrote some code and it doesn't run as I expected.

Look at the code:

import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue


def worker(queue):
    lock = Lock()
    with lock:
        for i in range(100):
            queue.put(i)

    print "worker", queue.qsize()


pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

Got the following output:

worker 100
main 0

What's happening and how to do what I want the right way? Thank you.

vortexxx192
  • 929
  • 1
  • 9
  • 24
  • 3
    `multiprocessing` can only transfer `pickle`-able objects which a `PriorityQueue` is not. See the answer of unutbu and rocksportrocker here: http://stackoverflow.com/questions/8804830/python-multiprocessing-pickling-error – j-i-l Aug 15 '14 at 11:30
  • @jojo On Unix, `PriorityQueue` doesn't get pickled in this case, its inherited via `fork`. It still won't work though, because `PriorityQueue` is not meant to be shared between processes. The worker and parent end up with separate copies of the object, so changing it one process has no effect on the other. – dano Aug 15 '14 at 15:10

1 Answers1

30

The problem isn't that it's not picklable in this case - if you're using a Unix-like platform, the queue can be passed to the child without pickling. (On Windows, I think you would get a pickling error here, though). The root problem is that you're not using a process-safe queue. The only queues that can be used between processes are the Queue objects that live inside the multiprocessing module. Unfortunately, there is no PriorityQueue implementation available. However, you can easily create one by registering a PriorityQueue with a multiprocessing.Manager class, like this:

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue


class MyManager(SyncManager):
    pass
MyManager.register("PriorityQueue", PriorityQueue)  # Register a shared PriorityQueue

def Manager():
    m = MyManager()
    m.start()
    return m

def worker(queue):
    print(queue)
    for i in range(100):
        queue.put(i)
    print "worker", queue.qsize()


m = Manager()
pr_queue = m.PriorityQueue()  # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

Output:

worker 100
main 100

Note that this probably won't perform quite as well as it would if it was standard multiprocessing.Queue subclass. The Manager-based PriorityQueue is implemented by creating a Manager server process which actually contains a regular PriorityQueue, and then providing your main and worker processes with Proxy objects that use IPC to read/write to the queue in the server process. Regular multiprocessing.Queues just write/read data to/from a Pipe. If that's a concern, you could try implementing your own multiprocessing.PriorityQueue by subclassing or delegating from multiprocessing.Queue. It may not be worth the effort, though.

dano
  • 91,354
  • 19
  • 222
  • 219
  • 3
    Thanks a lot. Finally, I implemented my own "priority queue" class with some number of multiprocessing.Queue's inside. Kind of layered priorities. – vortexxx192 Aug 15 '14 at 15:21
  • 1
    Implementing your own `multiprocessing.PriorityQueue` is not entirely straightforward, see here for some thoughts: https://github.com/python/cpython/issues/82440#issuecomment-1108773675 – dlukes Apr 25 '22 at 16:15
  • It would be great if the performance issue would be qualified by comparing empirically the registered queue vs the multiprocessing queue. Otherwise everybody needs to try it out themselves, before knowing whether they have an issue with this nice workaround or not. – Radio Controlled Dec 08 '22 at 07:37
  • Also, is there some indication as to how many separate queues would be feasible/efficient if one were to try the list-of-queues approach mentioned by @vortexxx192? Tens-of-thousands of queues still reasonable? – Radio Controlled Dec 08 '22 at 07:43