If you only have a single "higher priority" level rather than arbitrarily many as supported by queue.PriorityQueue
, you can efficiently use a collections.deque
for this by inserting normal jobs at the left .appendleft()
, and inserting your higher-priority entries at the right .append()
Both queue and deque instances have threadsafe push/pop methods
Misc advantages to Deques
- allows peeking arbitrary elements (indexable and iterable without popping, while queue instances can only be popped)
- significantly faster than
queue.PriorityQueue
(see sketchy testing below)
Cautions about length limitations
- setting a length will let it push elements out of either end, not just off the left, unlike queue instances, which block or raise
queue.Full
- any unbounded collection will eventually run your system out of memory if input rate exceeds consumption
import threading
from collections import deque as Deque
Q = Deque() # don't set a maximum length
def worker_queue_creator(q):
sleepE = threading.Event() # use wait method for sleeping thread
sleepE.wait(timeout=1)
for index in range(3): # start with a few jobs
Q.appendleft("low job {}".format(index))
Q.append("high job 1") # add an important job
for index in range(3, 3+3): # add a few more jobs
Q.appendleft("low job {}".format(index))
# one more important job before ending worker
sleepE.wait(timeout=2)
Q.append("high job 2")
# wait while the consumer worker processes these before exiting
sleepE.wait(timeout=5)
def worker_queue_consumer(q):
""" daemon thread which consumes queue forever """
sleepE = threading.Event() # use wait method for sleeping thread
sleepE.wait(timeout=1) # wait a moment to mock startup
while True:
try:
pre_q_str = str(q) # see what the Deque looks like before before pop
job = q.pop()
except IndexError: # Deque is empty
pass # keep trying forever
else: # successfully popped job
print("{}: {}".format(job, pre_q_str))
sleepE.wait(timeout=0.4) # quickly consume jobs
# create threads to consume and display the queue
T = [
threading.Thread(target=worker_queue_creator, args=(Q,)),
threading.Thread(target=worker_queue_consumer, args=(Q,), daemon=True),
]
for t in T:
t.start()
T[0].join() # wait on sleep in worker_queue_creator to quit
% python3 deque_as_priorityqueue.py
high job 1: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1', 'low job 0', 'high job 1'])
low job 0: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1', 'low job 0'])
low job 1: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1'])
low job 2: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2'])
low job 3: deque(['low job 5', 'low job 4', 'low job 3'])
high job 2: deque(['low job 5', 'low job 4', 'high job 2'])
low job 4: deque(['low job 5', 'low job 4'])
low job 5: deque(['low job 5'])
Comparison
import timeit
NUMBER = 1000
values_builder = """
low_priority_values = [(1, "low-{}".format(index)) for index in range(5000)]
high_priority_values = [(0, "high-{}".format(index)) for index in range(1000)]
"""
deque_setup = """
from collections import deque as Deque
Q = Deque()
"""
deque_logic_input = """
for item in low_priority_values:
Q.appendleft(item[1]) # index into tuples to remove priority
for item in high_priority_values:
Q.append(item[1])
"""
deque_logic_output = """
while True:
try:
v = Q.pop()
except IndexError:
break
"""
queue_setup = """
from queue import PriorityQueue
from queue import Empty
Q = PriorityQueue()
"""
queue_logic_input = """
for item in low_priority_values:
Q.put(item)
for item in high_priority_values:
Q.put(item)
"""
queue_logic_output = """
while True:
try:
v = Q.get_nowait()
except Empty:
break
"""
# abuse string catenation to build the setup blocks
results_dict = {
"deque input": timeit.timeit(deque_logic_input, setup=deque_setup+values_builder, number=NUMBER),
"queue input": timeit.timeit(queue_logic_input, setup=queue_setup+values_builder, number=NUMBER),
"deque output": timeit.timeit(deque_logic_output, setup=deque_setup+values_builder+deque_logic_input, number=NUMBER),
"queue output": timeit.timeit(queue_logic_output, setup=queue_setup+values_builder+queue_logic_input, number=NUMBER),
}
for k, v in results_dict.items():
print("{}: {}".format(k, v))
Results (6000 elements pushed and popped, timeit number=1000
)
% python3 deque_priorityqueue_compare.py
deque input: 0.853059
queue input: 24.504084000000002
deque output: 0.0013576999999997952
queue output: 0.02025689999999969
While this is a fabricated example to show off deque's performance, PriorityQueue
's insert time is some significant function of its length and O(log n)
or worse, while a Deque is O(1)
, so it should be fairly representative of a real use case