4

I'd like to use a CircularFifoQueue with spring ExecutorService.

The following does not compile, because CircularFifoQueue is not of type BlockingQueue. But it shows what I'm trying to achieve:

int threads = 10;
int queueSize = 500;
new java.util.concurrent.ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, 
                new CircularFifoQueue(queueSize));

With:

package org.apache.commons.collections4.queue;

public class CircularFifoQueue<E> extends AbstractCollection<E>
       implements Queue<E>, BoundedCollection<E>, Serializable

Question: does the code above provide thread safety (as CircularFifoQueue itself is not threadsafe)? If not, how can I make it threadsafe?

membersound
  • 81,582
  • 193
  • 585
  • 1,120

3 Answers3

4

You can synchronize using commons-collections QueueUtils.synchronizedQueue

Queue queue = QueueUtils.synchronizedQueue(new CircularFifoQueue());

But according to javadoc it needs additional synchronization for serial access:

In order to guarantee serial access, it is critical that all access to the backing queue is accomplished through the returned queue.

It is imperative that the user manually synchronize on the returned queue when iterating over it:

Queue queue = QueueUtils.synchronizedQueue(new CircularFifoQueue());
...
synchronized(queue) {
    Iterator i = queue.iterator(); // Must be in synchronized block
    while (i.hasNext())
        foo(i.next());
    }
}

Failure to follow this advice may result in non-deterministic behavior.

piet.t
  • 11,718
  • 21
  • 43
  • 52
7er
  • 91
  • 4
  • Note: this method available only since version 4.2 of commons-collections. – Flame239 Oct 24 '18 at 14:52
  • if you need to use `synchronized` anyway then why bother to wrap by `QueueUtils.synchronizedQueue` at all? – Ilya Serbis Apr 30 '21 at 12:42
  • 1
    synchronized block is required only for serial access (iterating over queue) what is not common usage of queue. Add/poll/peek become synchronized in QueueUtils.synchronizedQueue. – 7er May 04 '21 at 06:50
2

Work queues are meant to be blocking, and hence you would need to add a decorator to make that CircularFifoQueue a BLockingQueue.

class BlockingCircularFifoQueue<E> implements BlockingQueue<E>{
  private CircularFifoQueue<E> backingCollection;
  ...
} 

And delegate to the backing collection when needed. You would need to get Conditions and Lock right of course.

Sleiman Jneidi
  • 22,907
  • 14
  • 56
  • 77
1

You will have to practically write your own queue implementation. Since CircularFifoQueue uses an underlying array to hold the elements, I would take help of the ArrayBlockingQueue data structure in java.util.concurrent package as a starting point.

Example :

class ThreadSafeCircularFifoQueue<T> extends CircularFifoQueue<T> implements BlockingQueue<T> {

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    @Override
    public int size() {

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return super.size();
        } finally {
            lock.unlock();
        }
    }

    //and so forth
}
Dexter
  • 1,710
  • 2
  • 17
  • 34