6

Anybody familiar with how I can implement a multiprocessing priority queue in python?

  • For a succinct non-distributed implementation (using RPC like Alex Martelli talks about) see https://stackoverflow.com/a/25328987/712173 . – Ron Kaminsky Aug 08 '21 at 20:51

6 Answers6

6

Alas, it's nowhere as simple as changing queueing discipline of a good old Queue.Queue: the latter is in fact designed to be subclassed according to a template-method pattern, and overriding just the hook methods _put and/or _get can easily allow changing the queueing discipline (in 2.6 explicit LIFO and priority implementations are offered, but they were easy to make even in earlier versions of Python).

For multiprocessing, in the general case (multiple readers, multiple writers), I see no solution for how to implement priority queues except to give up on the distributed nature of the queue; designate one special auxiliary process that does nothing but handle queues, send (essentially) RPCs to it to create a queue with a specified discipline, do puts and gets to it, get info about it, &c. So one would get the usual problems about ensuring every process knows about the aux proc's location (host and port, say), etc (easier if the process is always spawned at startup by the main proc). A pretty large problem, especially if one wants to do it with good performance, safeguards against aux proc's crashes (requiring replication of data to slave processes, distributed "master election" among slaves if master crashes, &c), and so forth. Doing it from scratch sounds like a PhD's worth of work. One might start from Johnson's work, or piggyback on some very general approach like ActiveMQ.

Some special cases (e.g. single reader, single writer) might be easier, and turn out to be faster for their limited area of application; but then a very specifically restricted spec should be drawn up for that limited area -- and the results would not constitute a (general purpose) "multiprocessing queue", but one applicable only to the given constrained set of requirements.

Alex Martelli
  • 854,459
  • 170
  • 1,222
  • 1,395
2

There is a bug that prevents true FIFO.
Read here.

An alternate way to build a priority queue multiprocessing setup would be certainly be great!

Kevin Boyd
  • 12,121
  • 28
  • 86
  • 128
  • Yeah, but is there a good way to do priority queue? is it possible? (unless I have to specifically write a process to manage, which I think will be even harder and more error prone) –  Aug 30 '09 at 16:56
  • I haven't found a way yet, will keep you posted though. – Kevin Boyd Aug 30 '09 at 17:35
1

While this isn't an answer, maybe it can help you develop an multiprocessing queue.

Here's a simple priority queue class I wrote using Python's Array:

class PriorityQueue():
    """A basic priority queue that dequeues items with the smallest priority number."""
    def __init__(self):
        """Initializes the queue with no items in it."""
        self.array = []
        self.count = 0

    def enqueue(self, item, priority):
        """Adds an item to the queue."""
        self.array.append([item, priority])
        self.count += 1

    def dequeue(self):
        """Removes the highest priority item (smallest priority number) from the queue."""
        max = -1
        dq = 0
        if(self.count > 0):
            self.count -= 1

            for i in range(len(self.array)):
                if self.array[i][1] != None and self.array[i][1] > max:
                    max = self.array[i][1]

            if max == -1:
                return self.array.pop(0)
            else:
                for i in range(len(self.array)):
                    if self.array[i][1] != None and self.array[i][1] <= max:
                        max = self.array[i][1]
                        dq = i
                return self.array.pop(dq)

    def requeue(self, item, newPrio):
        """Changes specified item's priority."""
        for i in range(len(self.array)):
            if self.array[i][0] == item:
                self.array[i][1] = newPrio
                break

    def returnArray(self):
        """Returns array representation of the queue."""
        return self.array

    def __len__(self):
        """Returnes the length of the queue."""
        return self.count
Isaac
  • 15,783
  • 9
  • 53
  • 76
1

I had the same use case. But with a finite number of priorities.

What I am ending up doing is creating one Queue per priority, and my Process workers will try to get the items from those queues, starting with the most important queue to the less important one (moving from one queue to the other is done when the queue is empty)

user211505
  • 76
  • 2
  • This seems like the most reasonably feasible answer on the page. – speedplane Mar 18 '14 at 04:32
  • 3
    Hmm... on second thought, this still isn't easy to do perfectly right. For example, when the highest priority queue is empty and you go on to the next one, how can you prevent a race condition where the highest priority queue gets filled while you're checking the next highest queue? – speedplane Mar 18 '14 at 04:48
0

Depending on your requirements you could use the operating system and the file system in a number of ways. How large will the queue grow and how fast does it have to be? If the queue may be big but you are willing to open a couple files for every queue access you could use a BTree implementation to store the queue and file locking to enforce exclusive access. Slowish but robust.

If the queue will remain relatively small and you need it to be fast you might be able to use shared memory on some operating systems...

If the queue will be small (1000s of entries) and you don't need it to be really fast you could use something as simple as a directory with files containing the data with file locking. This would be my preference if small and slow is okay.

If the queue can be large and you want it to be fast on average, then you probably should use a dedicated server process like Alex suggests. This is a pain in the neck however.

What are your performance and size requirements?

Aaron Watters
  • 2,784
  • 3
  • 23
  • 37
0

Inspired by @user211505's suggestion, I put together something quick and dirty.

Note that this is not a complete solution to the difficult problem of priority queues in multiprocessing production environments. However, if you're just messing around or need something for a short project, this will likely fit the bill:

from time import sleep
from datetime import datetime
from Queue import Empty
from multiprocessing import Queue as ProcessQueue

class SimplePriorityQueue(object):
    '''
    Simple priority queue that works with multiprocessing. Only a finite number 
    of priorities are allowed. Adding many priorities slow things down. 

    Also: no guarantee that this will pull the highest priority item 
    out of the queue if many items are being added and removed. Race conditions
    exist where you may not get the highest priority queue item.  However, if 
    you tend to keep your queues not empty, this will be relatively rare.
    '''
    def __init__(self, num_priorities=1, default_sleep=.2):
        self.queues = []
        self.default_sleep = default_sleep
        for i in range(0, num_priorities):
            self.queues.append(ProcessQueue())

    def __repr__(self):
        return "<Queue with %d priorities, sizes: %s>"%(len(self.queues), 
                    ", ".join(map(lambda (i, q): "%d:%d"%(i, q.qsize()), 
                                enumerate(self.queues))))

    qsize = lambda(self): sum(map(lambda q: q.qsize(), self.queues))

    def get(self, block=True, timeout=None):
        start = datetime.utcnow()
        while True:
            for q in self.queues:
                try:
                    return q.get(block=False)
                except Empty:
                    pass
            if not block:
                raise Empty
            if timeout and (datetime.utcnow()-start).total_seconds > timeout:
                raise Empty

            if timeout:
                time_left = (datetime.utcnow()-start).total_seconds - timeout
                sleep(time_left/4)
            else:
                sleep(self.default_sleep)

    get_nowait = lambda(self): self.get(block=False)

    def put(self, priority, obj, block=False, timeout=None):
        if priority < 0 or priority >= len(self.queues):
            raise Exception("Priority %d out of range."%priority)
        # Block and timeout don't mean much here because we never set maxsize
        return self.queues[priority].put(obj, block=block, timeout=timeout)
speedplane
  • 15,673
  • 16
  • 86
  • 138