1

I implemented my custom BlockingQueue<T> and compared it with java.util.concurrent ArrayBlockingQueue.

Here is my implementation:

public class CustomBlockingQueue<T> implements BlockingQueue<T> {

    private final T[] table;

    private final int capacity;

    private int head = 0;

    private int tail = 0;

    private volatile int size;

    private final Lock lock = new ReentrantLock();

    private final Condition notEmpty = lock.newCondition();

    private final Condition notFull = lock.newCondition();

    @SuppressWarnings("unchecked")
    public CustomBlockingQueue(final int capacity) {
        this.capacity = capacity;
        this.table = (T[]) new Object[this.capacity];
        size = 0;
    }

    @Override
    public void add(final T item) throws InterruptedException {
        lock.lock();
        try {
            while (size >= table.length) {
                notFull.await();
            }
            if (tail == table.length) {
                tail = 0;
            }
            table[tail] = item;
            size++;
            tail++;
            if (size == 1) {
                notEmpty.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public T poll() throws InterruptedException {
        lock.lock();
        try {
            while (size == 0) {
                notEmpty.await();
            }
            if (head == table.length) {
                head = 0;
            }
            final T result = table[head];
            table[head] = null;
            size--;
            head++;
            if (size == capacity - 1) {
                notFull.signalAll();
            }
            return result;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public int size() {
        return size;
    }

}

My implementation is based on the array.

I don't ask you to review the code but help me to clarify the difference between my one and Java's.

In my code I do notEmpty.signalAll() or notFull.signalAll() inside the if clause but java.util.concurrent one simply invokes signal() in each case?

What is the reason for notifying another thread each time even when there's no necessary in it?

Pasha
  • 1,768
  • 6
  • 22
  • 43
  • Yes, exactly ArrayBlockingQueue – Pasha Jun 26 '19 at 13:31
  • I have condition.signalAll in if clauses, Java ArrayBlockingQueue simply always signals. If the capacity is 5 and 3 items in place, it does condition.signal – Pasha Jun 26 '19 at 13:41
  • so is there a problem with your code? or you're just asking why use signal every time instead of calling signalAll when transitioning from empty to nonempty and full to nonfull? because if this is just about signal vs signal all see if [this answer I wrote](https://stackoverflow.com/a/43831408/217324) makes sense. – Nathan Hughes Jun 26 '19 at 15:00
  • Hi Nathan, now I'm just asking about the advantages of Java's implementation but maybe the difference can lead to some bugs in future – Pasha Jun 26 '19 at 15:06

1 Answers1

1

If a thread is blocking until it can read from or add to the queue, the best place for it is in the waitset for the applicable condition. That way it isn't contending actively for the lock and isn't getting context-switched into.

If only one item gets added to the queue, we want to signal only one consumer. We don't want to signal more consumers than we have items in the queue, because it makes more work for the system to have to manage and give timeslices to all the threads that can't make progress regardless.

That's why ArrayBlockingQueue signals one at a time for each time an item is enqueued or dequeued, in order to avoid unnecessary wakeups. In your implementation everybody in the waitset gets woken up on the transition (from empty to non-empty, or from full to not full), regardless of how many of those threads will be able to get their work accomplished.

This gets more significant as more threads are hitting this concurrently. Imagine a system with 100 threads waiting to consume something from the queue, but only one item is added every 10 seconds. It would be better not to kick out 100 threads from the waitset just to have 99 of them have to go back in.

Nathan Hughes
  • 94,330
  • 19
  • 181
  • 276
  • Could you please walk through my question once more again? I have also different conditions but signalAll within if clause – Pasha Jun 26 '19 at 13:34
  • 2
    @Pavel you are signalling all threads at a point when you know that only one thread can proceed, so all other threads wake up, just to find the condition not fulfilled anymore and go back to waiting. – Holger Jun 26 '19 at 16:05