3

is there a priority-queue in Java which acts like LinkedBlockingQueue?

PriorityBlockingQueue is not blocking because it is unbounded.

barracuda317
  • 608
  • 7
  • 24
  • your .... words dont make much sense - `PriorityBlockignQueue` will **always** block if you use the blocking functions ... if you want it to be bounded simply create your own class which `extends PriorityBlockingQueue` and `@Override` the blocking functions - check for your limit and call the `super` method, block otherwise – specializt Aug 06 '16 at 11:13
  • @specializt: Actually, that doesn't seem to make much sense. It's **unbounded**, so it doesn't block on `put(E)`. From the JavaDoc: "As the queue is unbounded, this method will never block". Furthermore, one does not simply override concurrent collections. – beatngu13 Aug 06 '16 at 11:51
  • the problem is, that `put`is not blocking, because the queue is unbounded. i am wondering whether there is a similar implemention in which `put` is blocking – barracuda317 Aug 06 '16 at 11:52
  • @barracuda317: Checkout [this](http://stackoverflow.com/questions/2341615/bounded-priorityblockingqueue) question (duplicate?). – beatngu13 Aug 06 '16 at 11:56
  • @beatngu13 read my words again. And again. Of course the second "blocking functions" should actually be "blocking and nonblocking functions" – specializt Aug 07 '16 at 10:44

2 Answers2

2

If you don't need a full-blown BlockingQueue interface implementation then you can use Semaphore and something like this (in Kotlin):

interface BlockingBag<E: Any> {
    @Throws(InterruptedException::class)
    fun put(element: E)
    @Throws(InterruptedException::class)
    fun take(): E
}
class BlockingPriorityBag<E: Any>(val capacity: Int) : BlockingBag<E> {
    init {
        require(capacity >= 1) { "$capacity must be 1 or greater" }
    }
    private val queue = PriorityBlockingQueue<E>()
    private val semaphore = Semaphore(capacity)
    override fun take(): E {
        val item = queue.take()
        semaphore.release()
        return item
    }
    override fun put(element: E) {
        semaphore.acquire()
        queue.put(element)
    }
}
Martin Vysny
  • 3,088
  • 28
  • 39
1

You can try MinMaxPriorityQueue from Google Guava and set a maximum size as next:

Queue<User> users = Queues.synchronizedQueue(
    MinMaxPriorityQueue.orderedBy(userComparator)
       .maximumSize(1000)
       .create()
);

NB: As a MinMaxPriorityQueue is not thread safe you need to use the decorator Queues.synchronizedQueue(Queue) allowing to make it thread safe.

As you need a BlockingQueue you will have to implement the decorator by yourself which is not hard to implement.

Here is how it should look like:

public class SynchronizedBlockingQueue implements BlockingQueue {

    private final BlockingQueue queue;

    public SynchronizedBlockingQueue(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public synchronized boolean add(final Object o) {
        return this.queue.add(o);
    }

    @Override
    public synchronized boolean offer(final Object o) {
        return this.offer(o);
    }
    ...
}

Then the code to create your BlockingQueue will be:

BlockingQueue<User> users = new SynchronizedBlockingQueue(
    MinMaxPriorityQueue.orderedBy(userComparator)
       .maximumSize(1000)
       .create()
);
Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
Nicolas Filotto
  • 43,537
  • 11
  • 94
  • 122
  • 1
    As far as I know, this implementation is not thread-safe. There are many threads interacting with one queue on both sides (put and take). What are the difficulties I am facing, if the implementation is not thread-safe? – barracuda317 Aug 06 '16 at 11:25