46

Related questions:

I have a very large data set (more than 5 millions items) and I need to get N largest items from it. The most natural way to do it is to use heap/priority queue storing only top N items. There are several good implementations of priority queue for JVM (Scala/Java), namely:

First 2 are nice, but they store all the items, which in my case gives critical memory overhead. Third (Lucene implementation) doesn't have such a drawback, but as I can see from documentation it also doesn't support custom comparator, which makes it useless for me.

So, my question is: Is there a PriorityQueue implementation with fixed capacity and custom comparator?

UPD. Finally I've created my own implementation, based on Peter's answer:

public class FixedSizePriorityQueue<E> extends TreeSet<E> {

    private int elementsLeft;

    public FixedSizePriorityQueue(int maxSize) {
        super(new NaturalComparator());
        this.elementsLeft = maxSize;
    }

    public FixedSizePriorityQueue(int maxSize, Comparator<E> comparator) {
        super(comparator);
        this.elementsLeft = maxSize;
    }


    /**
     * @return true if element was added, false otherwise
     * */
    @Override
    public boolean add(E e) {
        if (elementsLeft == 0 && size() == 0) {
            // max size was initiated to zero => just return false
            return false;
        } else if (elementsLeft > 0) {
            // queue isn't full => add element and decrement elementsLeft
            boolean added = super.add(e);
            if (added) {
                elementsLeft--;
            }
            return added;
        } else {
            // there is already 1 or more elements => compare to the least
            int compared = super.comparator().compare(e, this.first());
            if (compared == 1) {
                // new element is larger than the least in queue => pull the least and add new one to queue
                pollFirst();
                super.add(e);
                return true;
            } else {
                // new element is less than the least in queue => return false
                return false;
            }
        }
    }
}

(where NaturalComparator is taken from this question)

Community
  • 1
  • 1
ffriend
  • 27,562
  • 13
  • 91
  • 132
  • 3
    My humble opinions on your implementation: 1) Do you really need to extend TreeSet? "FixedSizePriorityQueue is a TreeSet" doesn't sound well, I would make the set a member instead. 2) You don't really need to add state to your class by making elementsLeft a non-final variable. 3) Are you sure the add method always returns the correct value? 4) It'd be good practice to guard against null or illegal arguments. – Murat Derya Özen Sep 27 '13 at 20:08
  • @Murat: Thanks for suggestions. Feel free to post your improved implementation as an answer here. – ffriend Sep 27 '13 at 21:17
  • You're welcome and thank you:) posted it as an answer. – Murat Derya Özen Sep 27 '13 at 21:57
  • If the queue is full and you add an element which is already present in it (and is not the last element that would get polled), your solution will falsely leave `elementsLeft == 0`, while in this case it must become 1. – Inego Dec 22 '21 at 11:18
  • In fact, when the queue is full, you should first add the element and check the result. If it is false, you don't have to poll. – Inego Dec 22 '21 at 11:22

10 Answers10

18

How can you say Lucene's doesn't support a custom comparator?

Its abstract and you must implement the abstract method lessThan(T a, T b)

om-nom-nom
  • 62,329
  • 13
  • 183
  • 228
Robert Muir
  • 3,185
  • 21
  • 17
  • 1
    Huh, I didn't noticed, thanks! Upvote, but since it requires additional library, I believe another implementation based on standard API is more preferable (see my update). – ffriend Oct 25 '11 at 22:59
15

You could use a SortedSet e.g. TreeSet with a custom comparator and remove the smallest when the size reachs N.

Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • 1
    TreeSet would be less performant than PriorityQueue in this use case. http://stackoverflow.com/questions/3524862/when-should-i-use-a-treemap-over-a-priorityqueue-and-vice-versa – Tomasz Elendt Jun 21 '16 at 13:51
  • Or trivially, just do the same thing with a priority queue? Edit: I added an answer below to show what I mean. – Ayush Jan 16 '21 at 11:43
14

Though an old question but it may be helpful to somebody else. You can use minMaxPriorityQueue of Google's Java library guava.

Terminal
  • 1,969
  • 5
  • 21
  • 37
  • still marked as @Beta but has been there since 8.0 so I suspect is pretty solid. – Dave Moten Oct 28 '14 at 02:47
  • This is not actually how `MinMaxPriorityQueue` is intended to be used, and it will perform poorly for that use case. – Louis Wasserman Jun 22 '15 at 20:01
  • @LouisWasserman why is that? Wouldn't it be just `O(n*log(k))` (where n is dataset size, and k max size of the queue"? – Kranach Aug 21 '15 at 17:27
  • 3
    @Kranach the constant factors will be significantly worse than a normal `PriorityQueue`. Using a normal `PriorityQueue` will do much better, or better yet, `Ordering.greatestOf` uses an O(n) time, O(k) memory algorithm. (We are giving some consideration to deprecating `MinMaxPriorityQueue`, just because it tends to get misused in this way.) – Louis Wasserman Aug 21 '15 at 17:33
4

Below is the implementation I used before. Complies with Peter's suggestion.

 public @interface NonThreadSafe {
 }

/**
 * A priority queue implementation with a fixed size based on a {@link TreeMap}.
 * The number of elements in the queue will be at most {@code maxSize}.
 * Once the number of elements in the queue reaches {@code maxSize}, trying to add a new element
 * will remove the greatest element in the queue if the new element is less than or equal to
 * the current greatest element. The queue will not be modified otherwise.
 */
@NonThreadSafe
public static class FixedSizePriorityQueue<E> {
    private final TreeSet<E> treeSet; /* backing data structure */
    private final Comparator<? super E> comparator;
    private final int maxSize;

    /**
     * Constructs a {@link FixedSizePriorityQueue} with the specified {@code maxSize}
     * and {@code comparator}.
     *
     * @param maxSize    - The maximum size the queue can reach, must be a positive integer.
     * @param comparator - The comparator to be used to compare the elements in the queue, must be non-null.
     */
    public FixedSizePriorityQueue(final int maxSize, final Comparator<? super E> comparator) {
        super();
        if (maxSize <= 0) {
            throw new IllegalArgumentException("maxSize = " + maxSize + "; expected a positive integer.");
        }
        if (comparator == null) {
            throw new NullPointerException("Comparator is null.");
        }
        this.treeSet = new TreeSet<E>(comparator);
        this.comparator = treeSet.comparator();
        this.maxSize = maxSize;
    }

    /**
     * Adds an element to the queue. If the queue contains {@code maxSize} elements, {@code e} will
     * be compared to the greatest element in the queue using {@code comparator}.
     * If {@code e} is less than or equal to the greatest element, that element will be removed and
     * {@code e} will be added instead. Otherwise, the queue will not be modified
     * and {@code e} will not be added.
     *
     * @param e - Element to be added, must be non-null.
     */
    public void add(final E e) {
        if (e == null) {
            throw new NullPointerException("e is null.");
        }
        if (maxSize <= treeSet.size()) {
            final E firstElm = treeSet.first();
            if (comparator.compare(e, firstElm) < 1) {
                return;
            } else {
                treeSet.pollFirst();
            }
        }
        treeSet.add(e);
    }

    /**
     * @return Returns a sorted view of the queue as a {@link Collections#unmodifiableList(java.util.List)}
     *         unmodifiableList.
     */
    public List<E> asList() {
        return Collections.unmodifiableList(new ArrayList<E>(treeSet));
    }
}

I would appreciate any feedback btw.

EDIT: It seems like using a TreeSet is not very efficient after all because the calls to first() seem to take sublinear time. I changed the TreeSet to a PriorityQueue. The modified add() method looks like this:

   /**
     * Adds an element to the queue. If the queue contains {@code maxSize} elements, {@code e} will
     * be compared to the lowest element in the queue using {@code comparator}.
     * If {@code e} is greater than or equal to the lowest element, that element will be removed and
     * {@code e} will be added instead. Otherwise, the queue will not be modified
     * and {@code e} will not be added.
     *
     * @param e - Element to be added, must be non-null.
     */
    public void add(final E e) {
        if (e == null) {
            throw new NullPointerException("e is null.");
        }
        if (maxSize <= priorityQueue.size()) {
            final E firstElm = priorityQueue.peek();
            if (comparator.compare(e, firstElm) < 1) {
                return;
            } else {
                priorityQueue.poll();
            }
        }
        priorityQueue.add(e);
    }
Murat Derya Özen
  • 2,154
  • 8
  • 31
  • 44
  • 1
    Thanks! ... IMHO, for the `PriorityQueue`-based implementation, the `asList()` method should be something like: `List mutableList = new ArrayList(priorityQueue); Collections.sort(mutableList, comparator); return Collections.unmodifiableList( mutableList );` – Abdull Apr 17 '14 at 03:00
  • 2
    @Abdull is right. Your javadoc says it returns a sorted view but the iterator of a PriorityQueue does not guarantee elements are returned in order. – Dave Moten Feb 17 '16 at 20:29
4

I can't think of a ready-to-use one, but you can check my implementation of this collection with similar requirements.

The difference is the comparator, but if you extend from PriorityQueue you'll have it. And on each addition check if you haven't reached the limit, and if you have - drop the last item.

Community
  • 1
  • 1
Bozho
  • 588,226
  • 146
  • 1,060
  • 1,140
  • Unfortunately, standard `PriorityQueue` doesn't provide easy (and fast) way to remove least element (which with respect to heap structure is quite understandable). So I decided to implement fixed size priority queue on top of TreeSet. Thanks anyway. – ffriend Oct 25 '11 at 23:03
  • @Robert Muir: `poll()` removes head of the queue, i.e. greatest element, not the least. – ffriend Oct 25 '11 at 23:21
  • @Robert Muir: huh, you are right again! I imagined another heap implementation (tree-like) and haven't even thought it is easy to get least element, thus I was sure `head` is top element and missed this point in javadocs. Now I see it. Thanks again! – ffriend Oct 25 '11 at 23:56
  • the idiom for your topN is something like 1. if pq.size == N && item < pq.peek(), return (not competitive) 2. pq.offer(item). 3. if (pq.size > N), pq.poll(). The lucene one gives 2 advantages here: 1. if N is small you populate with sentinels to avoid the size checks. 2. if item is mutable, instead of offer + poll, you just change the head and call updateTop(). – Robert Muir Oct 26 '11 at 00:01
2

Exactly what I was looking for. However, the implementation contains a bug:

Namely: if elementsLeft > 0 and e is already contained in the TreeSet. In this case, elementsLeft is decreased, but the number of elements in the TreeSet stays the same.

I would suggest to replace the corresponding lines in the add() method by

        } else if (elementsLeft > 0) {
        // queue isn't full => add element and decrement elementsLeft
        boolean added = super.add(e);
        if (added) {
            elementsLeft--;
        }
        return added;
1

Try this code:

public class BoundedPQueue<E extends Comparable<E>> {
/**
 * Lock used for all public operations
 */
private final ReentrantLock lock;

PriorityBlockingQueue<E> queue ;
int size = 0;

public BoundedPQueue(int capacity){
    queue = new PriorityBlockingQueue<E>(capacity, new CustomComparator<E>());
    size = capacity;
    this.lock = new ReentrantLock();

}

public boolean offer(E e) {


    final ReentrantLock lock = this.lock;
    lock.lock();
    E vl = null;
    if(queue.size()>= size)  {
        vl= queue.poll();
        if(vl.compareTo(e)<0)
            e=vl;
    }

    try {
        return queue.offer(e);
    } finally {
        lock.unlock();
    }


}

public E poll()  {

    return queue.poll();
}

public static class CustomComparator<E extends Comparable<E>> implements Comparator<E> {


    @Override
    public int compare(E o1, E o2) {
        //give me a max heap
         return o1.compareTo(o2) *-1;

    }
}

}
Baby Groot
  • 4,637
  • 39
  • 52
  • 71
foo_bar
  • 41
  • 2
  • 4
1

Here is one I put together if you have guava. I think it is is pretty complete. Let me know if I missed something.

You can use the gauva ForwardingBlockingQueue so you don't have to map all the other methods.

import com.google.common.util.concurrent.ForwardingBlockingQueue;

public class PriorityBlockingQueueDecorator<E> extends
        ForwardingBlockingQueue<E> {

    public static final class QueueFullException extends IllegalStateException {

        private static final long serialVersionUID = -9218216017510478441L;

    }

    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    private int maxSize;

    private PriorityBlockingQueue<E> delegate;

    public PriorityBlockingQueueDecorator(PriorityBlockingQueue<E> delegate) {
        this(MAX_ARRAY_SIZE, delegate);
    }

    public PriorityBlockingQueueDecorator(int maxSize,
            PriorityBlockingQueue<E> delegate) {
        this.maxSize = maxSize;
        this.delegate = delegate;
    }

    @Override
    protected BlockingQueue<E> delegate() {
        return delegate;
    }

    @Override
    public boolean add(E element) {
        return offer(element);
    }

    @Override
    public boolean addAll(Collection<? extends E> collection) {
        boolean modified = false;
        for (E e : collection)
            if (add(e))
                modified = true;
        return modified;
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
        return offer(e);
    }

    @Override
    public boolean offer(E o) {
        if (maxSize > size()) {
            throw new QueueFullException();
        }
        return super.offer(o);
    }
}
Chris Hinshaw
  • 6,967
  • 2
  • 39
  • 65
1

Well, quite an old question, but I'm confused why a simpler solution hasn't been suggested yet.

Unless I'm missing something, this can be trivially solved using a min-heap (Java's default PriorityQueue implementation) with a slight twist in that the moment the size of the PriorityQueue becomes greater than k(ie if we're trying to store the top k elements), you poll the head.

Here's an example of what I mean

public void storeKLargest(int[] nums, int k) {
    PriorityQueue<Integer> pq = new PriorityQueue<>(k+1);
    for(int num: nums){
        if(pq.size() < k || pq.peek() < num)
            pq.offer(num);
        if(pq.size() == k+1)
            pq.poll();
    }
}

I used a PriorityQueue of Integer, but it's simple enough to replace it with a custom object and feed in a custom Comparator.

Unless I'm missing something obvious, I suppose this is what the OP was looking for.

Ayush
  • 1,510
  • 11
  • 27
  • 1
    Thanks! This is indeed a simple solution. However, if I got you correctly, it may be sub-optimal: true fixed-size priority queue starts rejecting new offered elements very quickly because most of them are lower than elements in the queue. If you additionally keep track of the lowest element, checking a new element will be as easy as one comparison. In your implementation, however, adding a new element will always change the set, which is costly. The obvious optimization to your implementation is to compare the new element to `pq.peek()` (since it's lowest) and offer it only if it's larger. – ffriend Jan 16 '21 at 18:18
  • 2
    @ffriend yup, I've edited the answer to reflect that. – Ayush Jan 18 '21 at 02:45
0

Create a PriorityQueue that has size limit. It stores N max numbers.

import java.util.*;

class Demo
{
    public static <E extends Comparable<E>> PriorityQueue<E> getPq(final int n, Comparator<E> comparator)
    {
        return new PriorityQueue<E>(comparator)
        {
            boolean full()
            {
                return size() >= n;
            }

            @Override 
            public boolean add(E e)
            {
                if (!full())
                {
                    return super.add(e);
                }
                else if (peek().compareTo(e) < 0)
                {
                    poll();
                    return super.add(e);
                }
                return false;
            }

            @Override
            public boolean offer(E e)
            {
                if (!full())
                {
                    return super.offer(e);
                }
                else if (peek().compareTo(e) < 0)
                {
                    poll();
                    return super.offer(e);
                }
                return false;
            }
        };
    }

    public static void printq(PriorityQueue pq)
    {
        Object o = null;
        while ((o = pq.poll()) != null)
        {
            System.out.println(o);
        }
    }

    public static void main (String[] args)
    {
        PriorityQueue<Integer> pq = getPq(2, new Comparator<Integer>(){
        @Override
        public int compare(Integer i1, Integer i2)
        {
            return i1.compareTo(i2);
        }
        });
        pq.add(4);
        pq.add(1);
        pq.add(5);
        pq.add(2);
        printq(pq);
    }
}