I have created a CloseableBlockingQueue extending ArrayBlockingQueue:
private static class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
// Flag indicates closed state.
private volatile boolean closed = false;
public CloseableBlockingQueue(int queueLength) {
super(queueLength);
}
/***
* Shortcut to do nothing if closed.
*/
@Override
public boolean offer(E e) {
return closed ? true : super.offer(e);
}
/***
* Shortcut to do nothing if closed.
*/
@Override
public void put(E e) throws InterruptedException {
if (!closed) {
super.put(e);
}
}
/***
* Shortcut to do nothing if closed.
*/
@Override
public E poll() {
return closed ? null : super.poll();
}
/***
* Shortcut to do nothing if closed.
* @throws InterruptedException
*/
@Override
public E poll(long l, TimeUnit tu) throws InterruptedException {
return closed ? null : super.poll(l, tu);
}
/***
* Mark me as closed and clear the queue.
*/
void close() {
closed = true;
// There could be more threads blocking on my queue than there are slots
// in the queue. We therefore MUST loop.
do {
// so keep clearing
clear();
/* Let others in ... more specifically, any collectors blocked on the
* queue should now unblock and finish their put.
*
* Subsequent puts should shortcut but it is the waiting ones I need
* to clear.
*/
Thread.yield();
/* Debug code.
// Did yield achieve?
if (!super.isEmpty()) {
* This DOES report success.
log("! Thread.yield() successful");
}
*
*/
// Until all are done.
} while (!super.isEmpty());
}
/***
* isClosed
*
* @return true if closed.
*/
boolean isClosed() {
return closed;
}
}
My concern is with the close method where it is trying to kick back into life any threads that are blocked on the queue. I use Thread.yield() to attempt that but I have seen references that suggest that this technique may not always work because there is no guarantee that any other blocked threads will be woken up during a yield.
The queue is used to concentrate the output of multiple threads into a single stream. There can easily be many more threads feeding it than there are slots in the queue so it is quite possible for the queue to be full AND several threads are blocking on it when it is closed.
I welcome your thoughts.
Added
Thanks to Tom's suggestion below I have refactored to:
- Hold a collection of all threads that may be blocking.
- On close, interrupt all of them.
BTW: Since the thread collection is used mostly for adding an object and almost immediately removing the same object I took a copy of Doug Lea's impressive ConcurrentDoublyLinkedList from http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm and added a couple of methods to allow me to keep hold of the added node. Removal should then be O(1) instead of O(n).
Paul