5

Can I configure Queue.Queue so that it always accepts new items, and simply drops the oldest item if the queue is full?

If not, is there another queue class in the standard library that can do this?

(I cannot use deque because I have a producer/consumer setup where I need synchronization.)

Alex Flint
  • 6,040
  • 8
  • 41
  • 80
  • 3
    You can use a [`collections.deque`](https://docs.python.org/2/library/collections.html#collections.deque) passing the `maxlen` parameter – Moses Koledoye Oct 24 '16 at 18:39
  • Ah but I need it to be synchronized. And it looks like it wouldn't be as simple as just protecting with a mutex since I want to block on the consumer thread until an item is available (rather than poll continuously). – Alex Flint Oct 24 '16 at 18:42
  • I figured that too. I've reopened – Moses Koledoye Oct 24 '16 at 18:42
  • That's how it is usually done, why wouldn't it be as simple? By the way, just to be on the safe side, you do know python's GIL restrictions on threading? – spectras Oct 24 '16 at 18:45
  • @spectras I'm currently using `queue.get(block=True)` on a consumer loop. Under the hood I assume that this waits on a condition variable. To use a mutex here instead I would have to busy-wait on `queue.get(block=False)` guarded by a mutex, which would chew up a lot more CPU. – Alex Flint Oct 24 '16 at 20:52
  • I don't know what you call a mutex in python. Python had the following threading tools: locks, conditions, semaphores, events, barriers. I was suggesting using a `deque` and guarding its access with a condition. No polling involved. – spectras Oct 25 '16 at 12:06
  • @spectras So using the `deque` idea, how do I create a thread that waits until an item is available, then does something with it, then waits until another item is available, and so on? – Alex Flint Oct 25 '16 at 15:37

3 Answers3

4

Example of protecting resource access using a condition, like I said in comments.

import collections
import threading
import time

queue = collections.deque()
condition = threading.Condition()

def consumer():
    condition.acquire()
    while True:
        while queue:
            item = queue.popleft()
            condition.release()
            # do something with item
            print(item)
            condition.acquire()
        condition.wait()

def push_item(item):
    with condition:
        queue.append(item)
        condition.notify()

# From that point forward, it is just demonstration code to show how to use

def example_producer_thread(*args):
    for arg in args:
        push_item(arg)

consumer_thread = threading.Thread(target=consumer, name='queue consumer')
consumer_thread.daemon = True  # so it does not prevent python from exiting
consumer_thread.start()

for example in [range(0, 10), range(10, 20), range(20, 30)]:
    threading.Thread(target=example_producer_thread, args=example).start()

time.sleep(1) # let the consumer thread some time before the script gets killed

The core is here:

  • consumer() is a consumer thread, it remains idle (no polling) until some other thread puts items in the queue. When awoken, it will lock the queue, get an item, unlock the queue, process the item, until there is no more items in the queue. It then releases it and goes back to sleep.
  • push_item() pushes a single item in the queue, and notifies the consumer thread it should wake up.

The rest is just to make it a working example. example_producer_thread will simply push its arguments into the queue. And we start three of those, each operating on a range of numbers so we can see the results.

Simply add a maxlen to the queue and you're good to go. Perhaps encapsulate the functionality in a small class while you're at it.

spectras
  • 13,105
  • 2
  • 31
  • 53
2

UPDATE: please do NOT use this. As @spectras points out, it doesn't actually synchronize properly.

This is not especially elegant, but it seems to work for me with multiple writers.

class QueueLatest(queue.Queue):
    def put(self, item):
        while True:
            try:
                super().put(item, block = False)
                break
            except queue.Full:
                _ = self.queue.popleft()
Eric Smith
  • 328
  • 4
  • 10
  • 1
    You are accessing the underlying `self.queue` object in a non-synchronized way. This *will* crash eventually depending on how exactly `queue.Queue` wraps it.. – spectras Apr 02 '20 at 00:52
  • I'm not saying that you're wrong, but looking at the Python 3.7 implementation of Queue it's not apparent to me where get() and _get() are doing any more synchronization than popleft(), and of course _get() is implemented as a call to popleft(). I would welcome further explanation of the details of the synchronization. – Eric Smith Apr 02 '20 at 21:47
  • 1
    `_get` is an internal method and must not be used from outside (precisely because it assumes any synchronization is already handled by its caller). `get` does the synchronization properly, using `threading.Condition`. – spectras Apr 02 '20 at 22:58
  • Ah, I didn't previously look at the definitions of the not_empty and not_full attributes. Thanks! It's a shame that I'm not allowed to downvote my own answer. – Eric Smith Apr 03 '20 at 05:40
  • 1
    Hehe with the added comment it actually provides value. Paths not taken with detail of why not to take them are as valuable information as paths taken. :) – spectras Apr 03 '20 at 10:26
  • @spectras It seems that replacing the `self.queue.popleft()` with a `super().get()` would solve the problem. Any reason not to do that instead? – NichtJens Sep 08 '21 at 13:11
  • @NichtJens what would happen if the queue becomes empty after `put` raises but before the call to `get`? – spectras Sep 08 '21 at 14:19
  • Ah, of course ... certainly correct. There *is* the option to acquire `Queue.mutex`, which I think would be sufficient(?). [Example](https://stackoverflow.com/a/31892187/655404) – NichtJens Sep 08 '21 at 14:29
0
from queue import Queue, Full

class QueueLatest(Queue):
    ''' customized put'''
    def put(self, *args, **kwargs):
        try:
            super().put(*args, **kwargs)
        except Full:
            self.queue.popleft()
            super().put(*args, **kwargs)

Based on @Eric Smith's answer, it seems to work. I used as in q = QueueLatest(1), seemed to work alright. Not too sure how robust it is or whether there might be any race conditioin etc.

mikey
  • 2,158
  • 1
  • 19
  • 24
  • Same comment as for Eric Smith's answer. This uses the underlying `self.queue` object with no synchronization. – spectras Apr 02 '20 at 00:53