Here is Circular Bounded Queue which is (supposed to be)thread safe and provides a blocking take
operation.
public class CircularQueue<T> {
private final int MAX_SIZE;
private final AtomicReferenceArray<T> buffer;
private final AtomicInteger start;
private final AtomicInteger end;
private final AtomicInteger len;
private final ReentrantLock rwlock;
private final Condition readCondition;
public CircularQueue(int size) {
MAX_SIZE = size;
buffer = new AtomicReferenceArray<T>(size);
start = new AtomicInteger(0);
end = new AtomicInteger(0);
len = new AtomicInteger(0);
rwlock = new ReentrantLock(true);
readCondition = rwlock.newCondition();
}
/**
* Adds to tail of the queue
*/
public void put(T val) {
try {
rwlock.lock();
buffer.set(end.get(), val);
end.set((end.get() + 1) % MAX_SIZE);
if (len.get() == MAX_SIZE) { // overwrite
start.set((start.get() + 1) % MAX_SIZE);
} else {
len.incrementAndGet();
}
readCondition.signal();
} finally {
rwlock.unlock();
}
}
/**
* Blocking removeFront operation
* @return
*/
public T take() {
T val = null;
try {
rwlock.lock();
while (len.get() == 0) {
try {
readCondition.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
val = buffer.get(start.get());
buffer.set(start.get(), null);
start.set((start.get() + 1) % MAX_SIZE);
len.decrementAndGet();
} finally {
rwlock.unlock();
}
return val;
}
public int size() {
int curLen = 0;
try {
rwlock.lock();
curLen = len.get();
} finally {
rwlock.unlock();
}
return curLen;
}
}
There are many operations which are yet to be added like poll
, offer
etc. But you can test this out with some threads :
It is going to hang your JVM if it runs correctly.
public static void main(String[] args) {
final int MAX_QUEUE_SIZE = 4;
final CircularQueue<Integer> q = new CircularQueue<Integer>(MAX_QUEUE_SIZE);
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Putting: from " + Thread.currentThread().getName() + " " + i);
q.put(i);
}
for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
System.out.println("Trying to get from " + Thread.currentThread().getName() + " " + q.take());
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 10; i < 10 + MAX_QUEUE_SIZE; ++i) {
try {
Thread.sleep(1001);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Putting: from " + Thread.currentThread().getName() + " " + i);
q.put(i);
}
for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
System.out.println("Trying to get from " + Thread.currentThread().getName() + " " + q.take());
}
}
}).start();
}
Your output should probably match
Putting: from Thread-0 0
Putting: from Thread-1 10
Putting: from Thread-0 1
Putting: from Thread-1 11
Putting: from Thread-0 2
Putting: from Thread-1 12
Putting: from Thread-0 3
Trying to get from Thread-0 11
Trying to get from Thread-0 2
Trying to get from Thread-0 12
Trying to get from Thread-0 3
Putting: from Thread-1 13
Trying to get from Thread-1 13
The other take operations from Thread-1 are waiting for a corresponding put operation since Thread-1 is slightly slower than Thread-0.