18

PriorityBlockingQueue is unbounded, but I need to bound it somehow. What is the best way to achieve that?

For information, the bounded PriorityBlockingQueue will be used in a ThreadPoolExecutor.

NB: By bounded I don't want to throw Exception if that happens, I want to put the object in the queue and then cut it based on its priority value. Is there any good way to do this cut thingie?

nanda
  • 24,458
  • 13
  • 71
  • 90

9 Answers9

12

I actually wouldn't subclass it. While I can't put together example code right now, I'd suggest a version of the decorator pattern.

Create a new class and implement the interfaces implemented by your class of interest: PriorityBlockingQueue. I've found the following interfaces used by this class:

Serializable, Iterable<E>, Collection<E>, BlockingQueue<E>, Queue<E>

In the constructor for a class, accept a PriorityBlockingQueue as a constructor parameter.

Then implement all the methods required by the interfaces via the instances of the PriorityblockingQueue. Add any code required to make it Bounded. This is a fairly standard implementation of a Decorator pattern.

Community
  • 1
  • 1
Frank V
  • 25,141
  • 34
  • 106
  • 144
  • nice design... I'd be very thankful if you can add the working implementation. I'd tried the same and we can compare the result later. Thanks again. +1 – nanda Mar 05 '10 at 05:52
  • the problem with this solution (and also the other solution) is to get the lock as the lock is private field in PBQ – nanda Mar 05 '10 at 08:37
  • I took down my server. I apologize but I can't redeploy the image. This is a fairly standard implementation of a Decorator pattern. Basically a new class accept the class of interest and then you implement the method that you need to alter the behavior on. Any thing else you'd pass through to the original class -- in OPs post, the PriorityBlockingQueue instance gone in through the constructor. – Frank V Nov 24 '13 at 00:40
  • 1
    Sadly this doesn't answer the question. A `PriorityBlockingQueue` is basically a heap wrapped with some locking to notify sleeping threads and avoid race conditions. Since it is a heap only the smallest value can be popped. According the question spec it also needs to be able to remove the largest items, which is doesn't support. – Ztyx Jul 17 '15 at 18:24
6

There's an implementation of this in the Google Collections/Guava library: MinMaxPriorityQueue.

A min-max priority queue can be configured with a maximum size. If so, each time the size of the queue exceeds that value, the queue automatically removes its greatest element according to its comparator (which might be the element that was just added). This is different from conventional bounded queues, which either block or reject new elements when full.

rescdsk
  • 8,739
  • 4
  • 36
  • 32
  • 1
    The ThreadPoolExecutor accepts the BlockingQueue, therefore it won't be possible to use the MinMaxPriorityQueue "as is". – Timofey Feb 18 '16 at 22:18
2

Of the top of my head, I'd subclass it and overwrite the put method to enforce this. If it goes over throw an exception or do whatever seems appropriate.

Something like:

public class LimitedPBQ extends PriorityBlockingQueue {

    private int maxItems;
    public LimitedPBQ(int maxItems){
        this.maxItems = maxItems;
    }

    @Override
    public boolean offer(Object e) {
        boolean success = super.offer(e);
        if(!success){
            return false;
        } else if (this.size()>maxItems){
            // Need to drop last item in queue
            // The array is not guaranteed to be in order, 
            // so you should sort it to be sure, even though Sun's Java 6 
            // version will return it in order
            this.remove(this.toArray()[this.size()-1]);
        }
        return true;
    }
}

Edit: Both add and put invoke offer, so overriding it should be enough

Edit 2: Should now remove the last element if over maxItems. There may be a more elegant way of doing it though.

Kris
  • 14,426
  • 7
  • 55
  • 65
  • I would do the same. Just override "add" and "offer", too. – Joonas Pulakka Feb 26 '10 at 12:55
  • I don't want throw Exception in this case, I want to put the object in the queue and then cut it based on its priority value. Is there any good way to do this cut thingie? – nanda Feb 26 '10 at 12:57
  • and another thing... will this break the `Blocking` property of the PBQ? – nanda Feb 26 '10 at 13:02
  • I've modified the code example. As for the _Blocking_, no. All access to and changes of the data structure are still being handled as before. I've just put a thin layer on top. – Kris Feb 26 '10 at 14:56
  • According to the javadoc .toArray() is not in any particular order, so you must Arrays.sort() the result using the same Comparator as the queue before removing elements at the end. – karoberts Feb 26 '10 at 15:18
  • You are right, the API offers no guarantee, although the implementation in Java 6 will actually return it in order as it simply copies an internal array that is used to maintain the priority queue. To avoid the code breaking in a future Java release you'd have to sort the array. – Kris Feb 26 '10 at 17:10
  • 1
    @Kris... if it breaks the 'blocking' property than I'd think it's pretty useless than. After all, it's one of the reason I use 'PriorityBlockingQueue'. – nanda Feb 26 '10 at 17:26
  • The code also has a race condition. Multiple threads can concurrently enter the `else` block and will then remove multiple elements. – Ztyx Jul 17 '15 at 18:41
2

After implementing a BoundedPriorityBlockingQueue according to what Frank V suggested I realized it didn't do quite what I wanted. The main problem is that the item which I have to insert into the queue may be a higher priority than everything already in the queue. Thus what I really want is a 'pivot' method, if I put an object into the queue, when the queue is full, I want to get back the lowest priority object, rather than blocking.

To flesh out Frank V's suggestions I used the following fragments...

public class BoundedPriorityBlockingQueue<E> 
   implements 
     Serializable, 
     Iterable<E>, 
     Collection<E>, 
     BlockingQueue<E>, 
     Queue<E>, 
     InstrumentedQueue 
{

... private final ReentrantLock lock; // = new ReentrantLock(); private final Condition notFull;

final private int capacity;
final private PriorityBlockingQueue<E> queue;

public BoundedPriorityBlockingQueue(int capacity) 
  throws IllegalArgumentException, 
         NoSuchFieldException, 
         IllegalAccessException 
{
   if (capacity < 1) throw 
       new IllegalArgumentException("capacity must be greater than zero");      
   this.capacity = capacity;
   this.queue = new PriorityBlockingQueue<E>();

   // gaining access to private field
   Field reqField;
   try {
    reqField = PriorityBlockingQueue.class.getDeclaredField("lock");
    reqField.setAccessible(true);
    this.lock = (ReentrantLock)reqField.get(ReentrantLock.class);
    this.notFull = this.lock.newCondition();

   } catch (SecurityException ex) {
    ex.printStackTrace();
    throw ex;
   } catch (NoSuchFieldException ex) {
    ex.printStackTrace();
    throw ex;
   } catch (IllegalAccessException ex) {
    ex.printStackTrace();
    throw ex;
   }

...

@Override
public boolean offer(E e) {
    this.lock.lock();
    try {
        while (this.size() == this.capacity)
            notFull.await();
        boolean success = this.queue.offer(e);
        return success;
    } catch (InterruptedException ie) {
        notFull.signal(); // propagate to a non-interrupted thread
        return false;

    } finally {
        this.lock.unlock();
    }
}
...

This also has some instrumentation so I can check the effectiveness of the queue. I am still working on 'PivotPriorityBlockingQueue', if anyone is interested I can post it.

phreed
  • 1,759
  • 1
  • 15
  • 30
1

There is an implementation in the ConcurrencyUtils repo.

bryant1410
  • 5,540
  • 4
  • 39
  • 40
0

If the order of the Runnables you want to execute is not strict (as is: it may occur that some lower priority tasks are executed even though higher priority tasks exist), then I would suggest the following, which boils down to periodically cutting the PriorityQueue down in size:

if (queue.size() > MIN_RETAIN * 2){
    ArrayList<T> toRetain = new ArrayList<T>(MIN_RETAIN);
    queue.drainTo(toRetain, MIN_RETAIN);
    queue.clear();
    for (T t : toRetain){
      queue.offer(t);
    }
}

This will obviously fail if the order needs to be strict, as draining will lead to a moment, wenn low priority task will retrieved from the queue using concurrent access.

The advantages are, that this is thread-safe and likely to get as fast as you can do with the priority queue design.

Christopher Oezbek
  • 23,994
  • 6
  • 61
  • 85
0

Not a single answer so far has all of the following properties:

  • Implements the BlockingQueue interface.
  • Supports removal of the absolute largest value.
  • No race conditions.

Unfortunately, there is no BlockingQueue implementation in the standard Java library. You will either need to find an implementation or implement something yourself. Implementing a BlockingQueue will require some knowledge on proper locking.

Here's what I suggest: Have a look at https://gist.github.com/JensRantil/30f812dd237039257a3d and use it as a template to implement your own wrapper around a SortedSet. Basically, all the locking is there and there are multiple unit tests (that will need some tweaking).

Ztyx
  • 14,100
  • 15
  • 78
  • 114
-1

There is another implementation here

It seems to do what you are asking for:

A BoundedPriorityQueue implements a priority queue with an upper bound on the number of elements. If the queue is not full, added elements are always added. If the queue is full and the added element is greater than the smallest element in the queue, the smallest element is removed and the new element is added. If the queue is full and the added element is not greater than the smallest element in the queue, the new element is not added.

Subir Kumar Sao
  • 8,171
  • 3
  • 26
  • 47
AbuZubair
  • 1,236
  • 14
  • 20
-1

Have a look at the ForwardingQueue from the Google Collections API. For blocking semantics you could use a Semaphore.

yawn
  • 8,014
  • 7
  • 29
  • 34