1

I am trying to write a simple queue like ArrayBlockingQueue in which the head of the queue will be removed if the queue is full while adding an element. The class should just have the below public methods

  • To get the size of the Queue
  • To get an element from the head of the queue. If no element available block.
  • To add an element at the tail of the queue

Can someone review the below code and let me know if there is a better way of doing this?

public class CircularArrayNonBlockingQueue<E> {
    private ArrayBlockingQueue<E> blockingQueue;

    public CircularArrayNonBlockingQueue(int size) {
        blockingQueue = new ArrayBlockingQueue<>(size);
    }

    public synchronized int size() {
        return blockingQueue.size();
    }

    public synchronized void add(E element) {
        if(blockingQueue.remainingCapacity() <= 0) {
            blockingQueue.poll();
        }
        blockingQueue.add(element);
    }

    public synchronized E poll() {
        return blockingQueue.poll();
    }
}

EDIT Based on the discussion in the comments I don't need to make all the methods synchronized. The updated code looks like below -

public class CircularNonBlockingQueue<E> {
    private final ArrayBlockingQueue<E> blockingQueue;

    public CircularNonBlockingQueue(int size) {
        blockingQueue = new ArrayBlockingQueue<>(size);
    }

    public int size() {
        return blockingQueue.size();
    }

    public synchronized void add(E element) {
        if(blockingQueue.remainingCapacity() <= 0) {
            blockingQueue.poll();
        }
        blockingQueue.add(element);
    }

    public E take() throws InterruptedException {
        return blockingQueue.take();
    }
}
tuk
  • 5,941
  • 14
  • 79
  • 162
  • Looks good to me :-) – AdamPillingTech Dec 18 '17 at 15:07
  • You might be able to optimize the synchronization, though, by using a ReadWriteLock – AdamPillingTech Dec 18 '17 at 15:10
  • @PillHead - Can you please explain this a bit more? – tuk Dec 18 '17 at 15:18
  • Take a look at the answer to this question https://stackoverflow.com/questions/34611106/synchronized-vs-readwritelock-performance – AdamPillingTech Dec 18 '17 at 15:21
  • 1
    why use a blocking queue? – grape_mao Dec 18 '17 at 17:27
  • LIke @grape_mao inferred, if you are synchronizing on all methods accessing the queue then you can use an unsynchronized collection. – Gray Dec 18 '17 at 17:35
  • @Gray - " if you are synchronizing on all methods accessing the queue then you can use an unsynchronized collection" - I did not get you. Can you explain this a bit more? – tuk Dec 18 '17 at 17:37
  • `BlockingQueue`s are by default internally synchronized. If you are protecting all of your accesses with your own `synchronized` methods then you don't need to use (or pay for) an internally synchronized collection. This is not true if you switch to (for example) the `ReadWriteLock` as @PillHead recommended. – Gray Dec 18 '17 at 20:17
  • @gray - I want to make add synchronized as I am doing multiple operations in it. I think if I don't make poll synchronized then we are having two different locks and the method poll & add will not be properly synchronized. Is my understanding correct? – tuk Dec 19 '17 at 03:53
  • If you look at the implementation of the `ArrayBlockingQueue` class, the `size()`, `poll()` and `add()` methods are guarded by a [ReentrantLock][1]. Hence you do not need to synchronize `size()` and `poll()` methods. [1]: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantLock.html – Sajith Edirisinghe Dec 19 '17 at 06:40
  • @SajithDilshan - One doubt - If I don't make `poll` synchronized then isn't that if a thread is in `add` method guarded by `synchronized` then another thread can make a call to `poll` ? – tuk Dec 19 '17 at 06:58
  • @grape_mao - I want to use this in a producer-consumer like pattern in which the consumer thread will be blocked until no data is available but producers will never be blocked. If there is no space then head will be removed to make space for the new element. – tuk Dec 19 '17 at 07:03
  • @tuk That is handled within ArrayBlockingQueue. It keeps a count variable internally guarded by a ReentrantLock both in size() and poll() methods. – Sajith Edirisinghe Dec 19 '17 at 07:03
  • I have updated the question as per my understanding from the discussions here. – tuk Dec 19 '17 at 07:19

1 Answers1

3

Having a thread-safe backend collection does not necessarily make a correct program. When only your add method is synchronized, the take() method may run concurrently to it, so it is possible that after your if(blockingQueue.remainingCapacity() <= 0) test within add, a concurrently running take() removes an element, so the poll() within add may remove an element unnecessarily. There is a perceivable difference to the situation where add() would complete before the take(), as the consuming thread would receive a different item. It other words, the effect would be as if add would sometimes not remove the oldest item, but the second oldest one.

On the other hand, if you use synchronized for all of your methods consistently, there is no need to have a thread-safe backend collection:

import java.util.ArrayDeque;

public class CircularBlockingQueue<E> {
    private final ArrayDeque<E> blockingQueue;
    private final int maxSize;

    public CircularBlockingQueue(int size) {
        if(size<1) throw new IllegalArgumentException("size == "+size);
        blockingQueue = new ArrayDeque<>(size);
        maxSize = size;
    }

    public synchronized int size() {
        return blockingQueue.size();
    }

    public synchronized void add(E element) {
        if(blockingQueue.size() == maxSize) {
            blockingQueue.poll();
        }
        blockingQueue.add(element);
        notify();
    }

    public synchronized E take() throws InterruptedException {
        while(blockingQueue.isEmpty()) wait();
        return blockingQueue.remove();
    }
}

However, if you can live with weaker guarantees regarding the oldest element, you can use a BlockingQueue and don’t need any synchronized:

public class CircularBlockingQueue<E> {
    private final ArrayBlockingQueue<E> blockingQueue;

    public CircularBlockingQueue(int size) {
        blockingQueue = new ArrayBlockingQueue<>(size);
    }

    public int size() {
        return blockingQueue.size();
    }

    public void add(E element) {
        while(!blockingQueue.offer(element)) {
            blockingQueue.poll();
        }
    }

    public E take() throws InterruptedException {
        return blockingQueue.take();
    }
}

It must be noted that neither of these solutions provides “fairness”. So if the number of producer and consumer threads is large compared to the queue’s capacity, there is the risk that producers repeatedly remove items without reactivating threads blocked in take(). So you should always ensure to have a sufficiently large capacity.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • Good answer, but the class name should be 'CircularQueue' instead of 'CircularNonBlockingQueue' because the 'take()' method is blocking. – Shihab Jan 22 '19 at 16:46
  • 1
    @Shihab good point. Even the question describes a blocking behavior while using the contradicting class name containing “NonBlocking”. Didn’t notice when I used the question’s code as starting point. – Holger Jan 22 '19 at 16:56
  • It's better to use `notifyAll()`, because `notify()` is not fair. Additionally take a look at https://stackoverflow.com/a/1006498/821497 – ciamej Jan 22 '19 at 17:23
  • @ciamej using `notifyAll()` instead of `notify()` doesn’t make it more fair. That would just waste CPU cycles while still only one arbitrary thread can win the race. And I take the freedom to assume that this bug described in a two decade old book is not an issue anymore, as otherwise Doug Lea surely mentioned it in one of his newer books. – Holger Jan 22 '19 at 17:39
  • @Holger For `notify()` the docs explicitly state that there is no guarantee which thread will be awakened, which means that some thread may be waiting forever, while others are continuously awakened. On the contrary, with `notifyAll()` all the threads are awaken and contest on a lock, and the scheduler determines which one acquires it - the scheduler, even though it needs not be fair, won't let any thread to be blocked on a lock forever, this is a weaker guarantee than fairness, but still stronger than no guarantee of `notify()`. By the way the default scheduler on Linux is fair. – ciamej Jan 23 '19 at 13:12
  • @ciamej it doesn’t matter whether the scheduler is fair or not. When you call `notifyAll()` when there is only one item available, all threads get the same chance of acquiring the monitor, but only one will get it first and consume the item. All other threads will subsequently find that no item is available and go back to wait state. When now the winner thread comes back to get another element and goes to wait as well, the situation is exactly as before, an arbitrary thread will win when you call `notifyAll()`, which can always be the same thread. In fact, it has the best chances of them all. – Holger Jan 23 '19 at 13:21
  • @Holger fair enough. I still wonder whether you can somehow leverage os scheduler's fairness to alleviate `notify()`/`notifyAll()` weak guarantees... – ciamej Jan 23 '19 at 15:06
  • @ciamej no, as long as just arriving threads get a headstart over the threads which need to wake up (which is an intended thing, to get the maximum performance), there’s no way to implement fairness atop of it. You would need a `Lock` with intrinsic fairness for that, but the better choice is to develop applications which do not need fairness at all. After all, fairness costs performance. – Holger Jan 23 '19 at 15:40