I am trying to implement Blocking Queue functionality but Thread goes in wait state. Not able to figure out what may be going wrong. I tried some of the implementations online, but none are working. Maybe my executors code is wrong. But if I replace MyBlockingQueue
with ArrayBlockingQueue
everything works fine.
Below are the two main methods.
public synchronized void put(Integer i) throws InterruptedException {
if (a.size() == capacity) {
wait();
}
a.add(i);
notifyAll();
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
wait();
}
a.remove(0);
notifyAll();
}
Code:
public class App {
public static MyBlockingQueue q = new MyBlockingQueue(10);
// public static ArrayBlockingQueue q = new ArrayBlockingQueue(10);
public void method1() throws InterruptedException {
for (int i = 0; i < 20; i++) {
q.put(i);
System.out.println(q);
}
}
public void method2() throws InterruptedException {
for (int i = 0; i < 20; i++) {
q.take();
}
}
public static void main(String[] args) throws InterruptedException {
App a = new App();
ExecutorService executor1 = Executors.newFixedThreadPool(20);
ExecutorService executor2 = Executors.newFixedThreadPool(20);
for (int i = 0; i < 2; i++) {
executor1.submit(new Runnable() {
public void run() {
try {
a.method1();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
for (int i = 0; i < 2; i++) {
executor2.submit(new Runnable() {
public void run() {
try {
a.method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
executor1.shutdown();
executor2.shutdown();
executor1.awaitTermination(1, TimeUnit.DAYS);
executor2.awaitTermination(2, TimeUnit.DAYS);
System.out.println();
System.out.println("The final queue is:");
System.out.println(App.q);
}
}
class MyBlockingQueue {
private ArrayList<Integer> a;
private int capacity;
public MyBlockingQueue(Integer cap) {
capacity = cap;
a = new ArrayList<Integer>(capacity);
}
@Override
public String toString() {
String output = "";
for (Integer i : a) {
output += i.toString() + " ";
}
return "[" + output + "]";
}
public synchronized void put(Integer i) throws InterruptedException {
if (a.size() == capacity) {
wait();
}
a.add(i);
notifyAll();
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
wait();
}
a.remove(0);
notifyAll();
}
}