As an answer to the question about pausing a BlockingQueue, I came with the idea of using an existing blocking structure blockingQueue2
and guarding the state with two different locks.
public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 184661285402L;
private Object lock1 = new Object();//used in pause() and in take()
private Object lock2 = new Object();//used in pause() and unpause()
//@GuardedBy("lock1")
private volatile boolean paused;
private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();
public void pause() {
if (!paused) {
synchronized (lock1) {
synchronized (lock2) {
if (!paused) {
paused = true;
blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty
}
}
}
}
}
public void unpause() throws InterruptedException {
if (paused) {
synchronized (lock2) {
paused = false;
blockingQueue2.put(new Object());//will release waiting thread, if there is one
}
}
}
@Override
public E take() throws InterruptedException {
E result = super.take();
if (paused) {
synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting
if (paused) {
blockingQueue2.take();
}
}
}
return result;
}
//TODO override similarly the poll() method.
}
I need two different locks, otherwise the unpause()
could wait for the lock1
held already in take()
by a consumer thread.
My questions:
- Could this come to a deadlock?
- Does it work at all?
- How often did you see such code, as I myself don't find it readable?
- How should I annotate the
paused
flag: with@GuardedBy("lock1, locks2")
?
PS: Any improvements are welcome (beside that I could have used a binary semaphore instead of blockingQueue2
).