2

I am trying to implement Blocking Queue functionality but Thread goes in wait state. Not able to figure out what may be going wrong. I tried some of the implementations online, but none are working. Maybe my executors code is wrong. But if I replace MyBlockingQueue with ArrayBlockingQueue everything works fine.

Below are the two main methods.

public synchronized void put(Integer i) throws InterruptedException {

    if (a.size() == capacity) {
        wait();
    }
    a.add(i);
    notifyAll();
}

public synchronized void take() throws InterruptedException {

    if (a.isEmpty()) {
        wait();
    }
    a.remove(0);
    notifyAll();
}

Code:

public class App {

    public static MyBlockingQueue q = new MyBlockingQueue(10);

    // public static ArrayBlockingQueue q = new ArrayBlockingQueue(10);

    public void method1() throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            q.put(i);
            System.out.println(q);
        }
    }

    public void method2() throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            q.take();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        App a = new App();

        ExecutorService executor1 = Executors.newFixedThreadPool(20);
        ExecutorService executor2 = Executors.newFixedThreadPool(20);

        for (int i = 0; i < 2; i++) {

            executor1.submit(new Runnable() {

                public void run() {

                    try {
                        a.method1();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }

        for (int i = 0; i < 2; i++) {

            executor2.submit(new Runnable() {

                public void run() {

                    try {
                        a.method2();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }

        executor1.shutdown();
        executor2.shutdown();

        executor1.awaitTermination(1, TimeUnit.DAYS);
        executor2.awaitTermination(2, TimeUnit.DAYS);

        System.out.println();
        System.out.println("The final queue is:");
        System.out.println(App.q);

    }

}

class MyBlockingQueue {

private ArrayList<Integer> a;
    private int capacity;

    public MyBlockingQueue(Integer cap) {
        capacity = cap;
        a = new ArrayList<Integer>(capacity);
    }

    @Override
    public String toString() {
        String output = "";
        for (Integer i : a) {
            output += i.toString() + " ";
        }
        return "[" + output + "]";
    }

    public synchronized void put(Integer i) throws InterruptedException {

        if (a.size() == capacity) {
            wait();
        }
        a.add(i);
        notifyAll();
    }

    public synchronized void take() throws InterruptedException {

        if (a.isEmpty()) {
            wait();
        }
        a.remove(0);
        notifyAll();
    }
}
user207421
  • 305,947
  • 44
  • 307
  • 483
garg10may
  • 5,794
  • 11
  • 50
  • 91
  • 2
    You should use while loops in your put and take methods: `while (a.size() == capacity) {` and `while (a.isEmpty()) {`. – Maurice Perry May 23 '17 at 05:26
  • see [whiles in action](https://stackoverflow.com/questions/2536692/a-simple-scenario-using-wait-and-notify-in-java) – Scary Wombat May 23 '17 at 05:27
  • @MauricePerry even with while loop it's not working perfect. Though better than if. Out of 10 only 1 attempt was success. – garg10may May 23 '17 at 05:32
  • 2
    @garg10may one more thing: toString should be synchronized – Maurice Perry May 23 '17 at 05:33
  • 1
    @MauricePerry awesome man, I have been trying this for a day, using various if's, whiles, re-entrant locks and what not. Even all online versions were failing, I thought everyone has given the wrong implementation. And this innocuous `toString` was the culprit. – garg10may May 23 '17 at 05:38
  • Don't fix your question to show the working code. It is of no use to anyone unles it exhibits the original problem. – user207421 May 23 '17 at 06:43

3 Answers3

0

Below is the shy looking culprit sitting there in corner, fixing if to while was also required.

@Override
public String toString() {
    String output = "";
    for (Integer i : a) {
        output += i.toString() + " ";
    }
    return "[" + output + "]";
}

I was adding toString method to solutions and they won't work.

Making it synchronized things work fine. Thanks @Maurice for suggesting this.

garg10may
  • 5,794
  • 11
  • 50
  • 91
0

The problem is that all threads are waiting on the same monitor and will be woken up by notifyAll and run simultaneously.

You can use method notify to wake up single thread and make single insert or remove. But notify() called inside put() can wake up thread waiting to insert on not full condition in put() method:

while (a.size() == capacity) {
        wait(); // many threads waiting
    }

And notify() called inside take() can wake up thread waiting on not empty condition:

while (a.isEmpty()) {
        wait();
    }

Because all threads use one monitor, notify can wake up any waiting thread.

So you need two monitors: one for not full condition and one for not empty.

    Object notFull = new Object();
    Object notEmpty = new Object();

    public synchronized void put(Integer i) throws InterruptedException {

        while (a.size() == capacity) {
            notFull.wait(); 
        }
        a.add(i);
        notEmpty.notify(); //wake up one random thread in take() method
    }

    public synchronized void take() throws InterruptedException {

        if (a.isEmpty()) {
            notEmpty.wait();
        }
        a.remove(0);
        notFull.notify(); // wake up one random thread in put() method
    }

Now notEmpty.notify() and notFull.notify() do not release lock acquired by synchronized keyword in put or take method.

We need to synchronize both methods on the same lock and release or acquire it based on two conditions: not full and not empty. For this there is java.util.concurrent.locks.ReentrantLock class:

A reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities.

This class represents a lock with conditions. Its method [newCondition][2] creates condition:

Returns a Condition instance for use with this Lock instance.

Condition allows to suspend multiple threads by await() method and wake up single waiting thread by signal() method. It can't be used without ReentrantLock. When Condition.await is called thread releases lock acquired in ReentrantLock.lock method. When Condition.signal is called, waiting thread acquires ReentrantLock. Final code :

/** Main lock guarding all access */
       private final ReentrantLock lock;
       /** Condition for waiting takes */
       private final Condition notEmpty;
       /** Condition for waiting puts */
       private final Condition notFull;

 public void put(Integer i) throws InterruptedException {
          lock.lockInterruptibly();
          try {
                  while (a.size() == capacity)
                      notFull.await();//realease monitor and sleep until notFull.signal is called

             a.add(i);
            notEmpty.signal();// wake up one random thread in take() method
          } finally {
              lock.unlock();
          }
 }

public void take() throws InterruptedException {
          lock.lockInterruptibly();
          try {
                   while (a.isEmpty())
                      notEmpty.await();//realease monitor and sleep until notEmpty.signal is called

                   a.remove(0);
             notFull.signal();// wake up one random thread in put() method
          } finally {
              lock.unlock();
          }       
}

ReentrantLock ensures that only one thread can execute put or take methods at the same time. Condition allows to suspend and resume threads based on condition.

Jay Smith
  • 2,331
  • 3
  • 16
  • 27
-3

Methods put and take in ArrayBlockingQueue should be synchronized with java.util.concurrent.locks.ReentrantLock.

Example:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // your logic 
        } finally {
            lock.unlock();
        }
    }
Jay Smith
  • 2,331
  • 3
  • 16
  • 27