1

I have created a CloseableBlockingQueue extending ArrayBlockingQueue:

private static class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
// Flag indicates closed state.
private volatile boolean closed = false;

public CloseableBlockingQueue(int queueLength) {
  super(queueLength);
}

/***
 * Shortcut to do nothing if closed.
 */
@Override
public boolean offer(E e) {
  return closed ? true : super.offer(e);
}

/***
 * Shortcut to do nothing if closed.
 */
@Override
public void put(E e) throws InterruptedException {
  if (!closed) {
    super.put(e);
  }
}

/***
 * Shortcut to do nothing if closed.
 */
@Override
public E poll() {
  return closed ? null : super.poll();
}

/***
 * Shortcut to do nothing if closed.
 * @throws InterruptedException 
 */
@Override
public E poll(long l, TimeUnit tu) throws InterruptedException {
  return closed ? null : super.poll(l, tu);
}

/***
 * Mark me as closed and clear the queue.
 */
void close() {
  closed = true;
  // There could be more threads blocking on my queue than there are slots 
  // in the queue. We therefore MUST loop.
  do {
    // so keep clearing
    clear();
    /* Let others in ... more specifically, any collectors blocked on the 
     * queue should now unblock and finish their put.
     * 
     * Subsequent puts should shortcut but it is the waiting ones I need
     * to clear.
     */
    Thread.yield();
    /* Debug code.
    // Did yield achieve?
    if (!super.isEmpty()) {
     * This DOES report success.
      log("! Thread.yield() successful");
    }
     * 
     */
    // Until all are done.
  } while (!super.isEmpty());
}

/***
 * isClosed
 * 
 * @return true if closed.
 */
boolean isClosed() {
  return closed;
}
}

My concern is with the close method where it is trying to kick back into life any threads that are blocked on the queue. I use Thread.yield() to attempt that but I have seen references that suggest that this technique may not always work because there is no guarantee that any other blocked threads will be woken up during a yield.

The queue is used to concentrate the output of multiple threads into a single stream. There can easily be many more threads feeding it than there are slots in the queue so it is quite possible for the queue to be full AND several threads are blocking on it when it is closed.

I welcome your thoughts.

Added

Thanks to Tom's suggestion below I have refactored to:

  • Hold a collection of all threads that may be blocking.
  • On close, interrupt all of them.

BTW: Since the thread collection is used mostly for adding an object and almost immediately removing the same object I took a copy of Doug Lea's impressive ConcurrentDoublyLinkedList from http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm and added a couple of methods to allow me to keep hold of the added node. Removal should then be O(1) instead of O(n).

Paul

OldCurmudgeon
  • 64,482
  • 16
  • 119
  • 213

3 Answers3

1

I don't think yield() would affect the threads blocked on the queue at all.

If you can keep track of the waiting threads (should be straightforward given you're wrapping the blocking methods). You could call interrupt() on them when you close.

See this question: ArrayBlockingQueue - How to "interrupt" a thread that is wating on .take() method

Community
  • 1
  • 1
Tom Elliott
  • 1,908
  • 1
  • 19
  • 39
  • I was just about to suggest something like that myself. :D An AtomicInteger counter of all in-progress puts should resolve to a count of blocked threads. This would certainly stop me prematurely exiting from close while threads are still blocked, which I think is the main danger of using yield. – OldCurmudgeon Oct 26 '11 at 11:24
  • What structure would you use to keep track of blocked threads? I suspect a LinkedList structure would be most efficient as adding and removing are both O(1). – OldCurmudgeon Oct 26 '11 at 11:45
  • LinkedList should be fine, as you'll only need to access it's contents once (assuming the structure can't re-open after you close it). I'd also synchronize your offer, put, poll and close methods in case someone tried to modify the queue while you were in the process of closing it. – Tom Elliott Oct 26 '11 at 14:01
0

Instead of the yield-while-check loop, use wait/notifyAll() or preferably one of the synchronization primitives from the util.concurrent package such as CountDownLatch. Put an object at the end of the queue that, when processed, triggers the notification. Make the calling thread (of the close method) await this notification. It will sleep until the queue has been drained.

Barend
  • 17,296
  • 2
  • 61
  • 80
  • Using wait/notifyall() ... interesting idea but surely the whole point of using a BlockingQueue is that you don't need to use wait. – OldCurmudgeon Oct 26 '11 at 10:19
  • Using CountDownLatch ... I could use a CountDownLatch to count the number of blocked threads instead of using super.isEmpty()!! Let me try that. ... I can't seem to get that to work. – OldCurmudgeon Oct 26 '11 at 10:24
0

I would place a poison pill into the queue. e.g. null. When a waiting thread gets the pill, it places it back in the queue.

E pill = null;

void close() {
   closed = true;
   clear();
   while(!offer(pill)); // will wake one thread.
}

public E poll() {
   if (closed) return null;
   E e = super.poll();
   if (e == pill) add(e); // will wake one thread.
   return e;
}
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • A waiting thread is blocking on put. It cannot receive a poison pill. – OldCurmudgeon Oct 26 '11 at 10:28
  • The `clear()` should release it. Your queue needs to allow more entries than you have blocking put threads. – Peter Lawrey Oct 26 '11 at 10:30
  • But that is exactly the problem. The clear may indeed remove enough entries to release the block on a blocked thread but the block will not release until the blocked thread is given time to discover that fact and yield may not give ANY other threads time, even if they ARE blocked. – OldCurmudgeon Oct 26 '11 at 10:44
  • Which is why I wouldn't use yeild() and instead trust that the threads will wake quickly enough that it won't matter. You can never guarentee when, if ever, a thread will wake up even with notify and unpack. – Peter Lawrey Oct 26 '11 at 10:48
  • 1
    There may be a notification issue: close() probably returns before the asynchronous close mechanism has been completed, (which may, or may not, matter). If this is an issue, the pill could be a bit cleverer and contain a count-down int and an event instance that close() waits on. The last blocked thread to die counts down to 0 and so signals the event before committing suicide. The queued objects may need a boolean or enumeration so that the threads can recognise a pill when they get one. – Martin James Oct 26 '11 at 10:51
  • I cannot restrict the user to a specific number of threads. There could be just 10 slots in the queue and 1000 blocked threads waiting on put, thus my loop to keep clearing the queue and yield to unblock the waiting threads. Attempting to add a poison pill will just block in this scenario, or am I wrong. – OldCurmudgeon Oct 26 '11 at 11:07
  • No, you are not wrong. Either use a clear/sleep, (sleep, not yeild), loop to encourage your 1000 blocked producers to empty themselves into the black hole or use a different flow-control mechansim to avoid bounded queues altogether, (bounded instead of unbounded queue=double your deadlock possibilities). – Martin James Oct 26 '11 at 11:22
  • Another thought - temporarily raise the priority of the caller in close() to try to ensure that, once the, (now high priority), clear/sleep loop has reduced the queue count to 0, the pill will get pushed on before the massed herd of other blocked producers get in the way. – Martin James Oct 26 '11 at 11:30