-1

I found this example from SO. Now, I was trying to understand the usage of wait() and notify()/notifyAll(). In which scenario and why do we need this.

class BlockingQueue<T> {

    private Queue<T> queue = new LinkedList<T>();
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public synchronized void put(T element) throws InterruptedException {
        while (queue.size() == capacity) {
             System.out.println("Waiting...");
            wait();

        }

        queue.add(element);
        notify(); // notifyAll() for multiple producer/consumer threads
    }

    public synchronized T take() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();
        }

        T item = queue.remove();
        notify(); // notifyAll() for multiple producer/consumer threads
        return item;
    }
}

So, implemented Runnable and overriden run() method like below

 @Override
    public void run() {
        // synchronized (this) {
        BlockingQueue<Integer> s = new BlockingQueue(10);
        for (int i = 0; i < 12; i++) {
            try {
                s.put(i);
                if (i > 9) {
                    System.out.println(Thread.currentThread().getName() + "  : " + s.take());
                }
                System.out.println(Thread.currentThread().getName() + " ExtendsThread : Counter : " + i);
            } //}
            //notify();
            catch (InterruptedException ex) {
                Logger.getLogger(ExtendsThread.class.getName()).log(Level.SEVERE, null, ex);
            }

        }

    }

And, running the thread like below

 ImplementsRunnable rc = new ImplementsRunnable();
        Thread t1 = new Thread(rc, "A");
        t1.start();

When I'm running it, then it is stuck after counter : 9 and keep on waiting for forever. Anyone suggest me what's wrong here ?

Ravi
  • 30,829
  • 42
  • 119
  • 173
  • _what's wrong with my implementation for understanding wait() and notify()_ What is your understanding of those methods? What do you think they do? – Sotirios Delimanolis Jul 17 '15 at 05:36
  • Maybe [this SQ question and the accepted answer][1] helps you [1]: http://stackoverflow.com/questions/886722/how-to-use-wait-and-notify-in-java – bish Jul 17 '15 at 05:37
  • @SotiriosDelimanolis `wait()` will wait and `notify()` will notify waiting threads. Am I correct ? – Ravi Jul 17 '15 at 05:41
  • Create a new `Runnable` which puts values into a `BlockingQueue` and a `Runnable` which takes the values from a `BlockingQueue`, pass both the same instance of `BlockQueue`, start them and let them run... – MadProgrammer Jul 17 '15 at 05:41
  • @MadProgrammer Let me try this one – Ravi Jul 17 '15 at 05:47

2 Answers2

2

Your concept is slightly flawed. The BlockingQueue can act as bridge in a producer/consumer pattern.

That is, it allows one thread to write content to it and another thread to read content from it, but it's doing it in such away that if:

  • There are no items to be taken, it waits until new items arrive
  • If there are too many items, it waits for items to be removed

In this case, the wait and notify are internal messaging for the instance of the BlockingQueue

You can have a look at Intrinsic Locks and Synchronization.

So, instead of using just one thread, you should be using (at least) two, a produce and a consumer...

Producer

This takes an instance of BlockingQueue and adds int values to it. Each time it stops for 1 second before adding the next

public class Producer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int index = 0; index < 10; index++) {
            try {
                System.out.println("Put " + index);
                queue.put(index);
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
            }
        }
    }

}

Consumer

The consumer takes a BlockQueue and reads int values from it, which be blocked until a value exists.

public class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer value = queue.take();
                System.out.println("Took " + value);
            }
        } catch (InterruptedException ex) {
            Logger.getLogger(JavaApplication220.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

}

You can get these start using something like...

BlockingQueue bq = new BlockingQueue(10);
Thread p = new Thread(new Producer(bq));
Thread c = new Thread(new Consumer(bq));
c.setDaemon(true);
c.start();
p.start();

You should note that there is a small delay between the put messages, but almost no delay between the took messages. This is the queue in action. The Consumer is blocking/waiting on the queue to have something to give it.

You can play around with the Producer and Consumer, maybe changing their times (having a longer delay in the Consumer before taking an item for example) to see how this might cause different effects

When I'm running it, then it is stuck after counter : 9 and keep on waiting for forever

This is likely because you've exceeded the capacity of the queue and it's put method is blocking until you take something from it (you essentially have a dead lock, where the queue is waiting for you to take something from it, but you can't because you're locked on the put)

Things to remember:

  • For two or more threads to work with monitor locks, they MUST share the same instance of the monitor/object lock. In this case, the same instance of BlockingQueue
  • notify will wake one object whose is waiting on the same instance of the monitor lock's wait method. There is no way to know which one. This can be useful if you have multiple consumers, but don't care about the order in which the data is processed, for example

Updated with additional example

So, this takes the Thread.sleep out of the Producer (and allows the producer to produce 100 values) and adds a Thread.sleep to the Consumer.

This way, the Producer will reach it's capacity before the Consumer can drain it, forcing it to wait until the Consumer can take values from it...

public class Producer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int index = 0; index < 100; index++) {
            try {
                System.out.println("Put " + index);
                queue.put(index);
            } catch (InterruptedException ex) {
            }
        }
    }

}

public class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer value = queue.take();
                System.out.println("Took " + value);
                Thread.sleep(1000);
            }
        } catch (InterruptedException ex) {
        }
    }

}
MadProgrammer
  • 343,457
  • 22
  • 230
  • 366
  • why is it adding and removing at same time ? Please correct my understanding. I was expecting to work this code like, it should keep on adding elements till it reaches the limit and once it reached then it should wait and then it should remove one elements and add one elements. That's the way I was expecting to work `wait` and `notify`. – Ravi Jul 17 '15 at 06:14
  • `wait` and `notify` work through different threads, if you tried this on the same thread, you'd end up with the problem you have, where it blocks indefinitely. Try this, put a `Thread.sleep(5000)` in the consumer after each take, see what that does – MadProgrammer Jul 17 '15 at 06:15
  • Exactly.. This is perfect. Let me understand the code flow. I will let you know for the same :) Thanks. – Ravi Jul 17 '15 at 06:19
  • Check the update, it moves the `Thread.sleep` to the `Consumer`, which allows the `Producer` to run as fast as it can. It also fills the `Producer` with 100 values instead of 10, making sure we hit that capacity limit ;) – MadProgrammer Jul 17 '15 at 06:21
  • Why did you use `while (true)` in `Consumer` ? Doesn't it mean `infinite` loop ? – Ravi Jul 17 '15 at 06:24
  • Yeah it does, but I generally had no idea how many times it might need to loop and I did have the `Consumer` set as a `daemon` thread in the first example, which would allow the JVM to exit once the `Producer` thread existed ;) – MadProgrammer Jul 17 '15 at 06:28
  • Ok Thanks. You explanation helps me and will definitely helps other especially beginner to understand the `Thread`. – Ravi Jul 17 '15 at 06:30
1

Add printlns here

public synchronized void put(T element) throws InterruptedException {
    while (queue.size() == capacity) {
        System.out.println("blocked");
        wait();
    }
    queue.add(element);
    notify(); // notifyAll() for multiple producer/consumer threads
    System.out.println("put "+ element);
}

public synchronized T take() throws InterruptedException {
    while (queue.isEmpty()) {
        wait();
    }
    T item = queue.remove();
    notify(); // notifyAll() for multiple producer/consumer threads
    System.out.println("removed " + item);
    return item;
}

and run this test

public static void main(String argv[]) throws Exception {
    final BlockingQueue q = new BlockingQueue(2);
    new Thread() {
        public void run() {
            try {
                Thread.sleep(5000);
                q.take();
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    }.start();
    q.put(1);
    q.put(2);  // will block here until tread 2 takes an element and reduces the capacity
    q.put(3);   
}

it will print

put 1
put 2
blocked
removed 1
put 3
Evgeniy Dorofeev
  • 133,369
  • 30
  • 199
  • 275
  • Your code works, exactly what I was expected. But, I have one doubt, when I added another element to the queue then it get stuck and keep on waiting again. why ? – Ravi Jul 17 '15 at 06:16
  • @jWeaver The queue capacity is `2`, so it's probably blocking, waiting for something to take something from the queue, but the "take" thread only removes a single item – MadProgrammer Jul 17 '15 at 06:22