1

I just want to achieve a blocking queue with ReentrantLock, I define two conditions full and empty, source code as follows:

@Slf4j
@NotThreadSafe
public class CustomBlockQueue<T> {

    private ReentrantLock lock = new ReentrantLock();

    private Condition full = lock.newCondition();
    private Condition empty = lock.newCondition();

    private Integer maxLength = 1 << 4;

    private Integer putIndex = 0, takeIndex = 0;
    private Integer count = 0;

    private Object[] value;

    public BlockQueue(){
        value = new Object[maxLength];
    }

    public BlockQueue(Integer maxLength){
        this.maxLength = maxLength;
        value = new Object[maxLength];
    }

    public void put(T val) throws InterruptedException {
        lock.lock();
        try {
            if (count.equals(maxLength)){
                log.info("The queue is full!");
                full.await();
            }
            putIndex = putIndex % maxLength;
            value[putIndex++] = val;
            count++;
            empty.signal();
        }finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        Object val;
        try {
            if (count == 0){
                empty.await();
            }
            takeIndex = takeIndex % maxLength;
            val = value[takeIndex++];
            count--;
            full.signal();
        }finally {
           lock.unlock();
        }
        return (T) val;
    }
}

When testing in two consumer threads and one provider thread, the count is less than zero in some accidental time.
Why the blocking queue is not thread safe, who can help me, giving me some guidance? Thank you very mach!

Update(2018/10/17)

If I just use one Condition, could it run correctly? Source code as follows:

@Slf4j
@NotThreadSafe
public class CustomBlockQueue<T> {

    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    ...

    public void put(T val) throws InterruptedException {
        lock.lock();
        try {
            while (count.equals(maxLength)){
                log.info("The queue is full!");
                condition.await();
            }
            putIndex = putIndex % maxLength;
            value[putIndex++] = val;
            count++;
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        Object val;
        try {
            while (count == 0){
                condition.await();
            }
            takeIndex = takeIndex % maxLength;
            val = value[takeIndex++];
            count--;
            condition.signal();
        }finally {
           lock.unlock();
        }
        return (T) val;
    }
}
Shuai Junlan
  • 535
  • 1
  • 6
  • 22

2 Answers2

3

A similar question: Why should wait() always be called inside a loop

Explanation

Consider this situation:

  1. consumer 1 is blocked on lock.lock();
  2. consumer 2 is blocked on empty.await();.
  3. producer holds the lock and adds one element to the queue, which makes count = 1 and calls empty.signal();.
  4. consumer 2 gets this signal and wakes up from empty.await();, it needs to re-aquire the lock, while cosumer 1 is ahead of it.
  5. cosumer 1 gets the lock and it finds count is 1, so it decrement count to 0.
  6. cosumer 2 gets the lock, since it has executed

    if (count == 0){    <--- consumer 2 will not re-check this condition
        empty.await();  
    }
    

    cosumer 2 believes the queue is not empty, then it executes:

    takeIndex = takeIndex % maxLength;
    val = value[takeIndex++];
    count--;
    

    which makes count derement to 0.

Solution

Use while instead of if gurantees consumer 2 will recheck whether the queue is empty, which gurantees count >= 0.

while (count == 0){
    empty.await();
}

also, it's better to do the same thing with produce method:

while (count.equals(maxLength)){
    log.info("The queue is full!");
    full.await();
}
xingbin
  • 27,410
  • 9
  • 53
  • 103
  • If it just uses one Condition, could it run correctly? – Shuai Junlan Oct 17 '18 at 09:38
  • @ShuaiJunlan I do not think so. Consider the situation in my answer, using one `Condition` does not help. The point is `await` should always in a `while` loop, so it can re-check the condition after getting the singal. – xingbin Oct 17 '18 at 09:42
  • I change the code to `while loop`, and use just one `Condition`, you can look the part of update. – Shuai Junlan Oct 17 '18 at 09:47
1

One obvious thing is that Condition can "awake" without corresponding call to "signal". So instead of using "if", you need to use "while". For example:

while (count == 0) {
    empty.await();
}

See also javadoc here: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html

The lock associated with this Condition is atomically released and the current thread becomes disabled for thread scheduling purposes and lies dormant until one of four things happens:

  • Some other thread invokes the signal() method for this Condition and the current thread happens to be chosen as the thread to be awakened; or
  • Some other thread invokes the signalAll() method for this Condition; or
  • Some other thread interrupts the current thread, and interruption of thread suspension is supported; or
  • A "spurious wakeup" occurs.
Stas
  • 1,707
  • 15
  • 25
  • 1
    Spurious wakeups are possible in some JVM implementations running under some operating systems, and they are a valid reason why one always should `wait()` in a loop, but spurious wakeups are rare. The answer from 孙兴斌 gives a more likely explanation for the bad behavior: In a multi-consumer situation, consumer A can wake up from a `wait()` call, after consumer B has already consumed the whatever-it-was that caused the notification. – Solomon Slow Oct 16 '18 at 13:42
  • I change the code to `while loop`, and use just one `Condition`, you can look the part of `update`, do you think it could run correctly? Why? – Shuai Junlan Oct 17 '18 at 10:41
  • @SolomonSlow can't argue with that. It's just that I pointed to the obvious problem. The fact it's not often it doesn't mean it doesn't happen, therefore has to be reasoned about and it's often easier to notice than other things. – Stas Oct 18 '18 at 10:29
  • @Stas, we're answering different questions. You're answering the question, "Why should wait() _always_ be called in a loop?" I'm answering the question, "What's the most likely explanation for what happened in this particular (multi-consumer) case?" – Solomon Slow Oct 18 '18 at 10:34
  • @ShuaiJunlan looks alright, though, can't say for sure it doesn't have bugs ;) – Stas Oct 18 '18 at 10:36
  • @SolomonSlow I'm answering question "What can be wrong here?". That is one of answers. Sorry, if that one differs from what you would expect. – Stas Oct 18 '18 at 10:40