Your concept is slightly flawed. The BlockingQueue
can act as bridge in a producer/consumer pattern.
That is, it allows one thread to write content to it and another thread to read content from it, but it's doing it in such away that if:
- There are no items to be taken, it waits until new items arrive
- If there are too many items, it waits for items to be removed
In this case, the wait
and notify
are internal messaging for the instance of the BlockingQueue
You can have a look at Intrinsic Locks and Synchronization.
So, instead of using just one thread, you should be using (at least) two, a produce and a consumer...
Producer
This takes an instance of BlockingQueue
and adds int
values to it. Each time it stops for 1 second before adding the next
public class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int index = 0; index < 10; index++) {
try {
System.out.println("Put " + index);
queue.put(index);
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}
}
}
Consumer
The consumer takes a BlockQueue
and reads int
values from it, which be blocked until a value exists.
public class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer value = queue.take();
System.out.println("Took " + value);
}
} catch (InterruptedException ex) {
Logger.getLogger(JavaApplication220.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
You can get these start using something like...
BlockingQueue bq = new BlockingQueue(10);
Thread p = new Thread(new Producer(bq));
Thread c = new Thread(new Consumer(bq));
c.setDaemon(true);
c.start();
p.start();
You should note that there is a small delay between the put
messages, but almost no delay between the took
messages. This is the queue in action. The Consumer
is blocking/waiting on the queue to have something to give it.
You can play around with the Producer
and Consumer
, maybe changing their times (having a longer delay in the Consumer
before taking an item for example) to see how this might cause different effects
When I'm running it, then it is stuck after counter : 9 and keep on waiting for forever
This is likely because you've exceeded the capacity of the queue and it's put
method is blocking until you take something from it (you essentially have a dead lock, where the queue is waiting for you to take something from it, but you can't because you're locked on the put
)
Things to remember:
- For two or more threads to work with monitor locks, they MUST share the same instance of the monitor/object lock. In this case, the same instance of
BlockingQueue
notify
will wake one object whose is waiting on the same instance of the monitor lock's wait
method. There is no way to know which one. This can be useful if you have multiple consumers, but don't care about the order in which the data is processed, for example
Updated with additional example
So, this takes the Thread.sleep
out of the Producer
(and allows the producer to produce 100 values) and adds a Thread.sleep
to the Consumer
.
This way, the Producer
will reach it's capacity before the Consumer
can drain it, forcing it to wait until the Consumer
can take values from it...
public class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int index = 0; index < 100; index++) {
try {
System.out.println("Put " + index);
queue.put(index);
} catch (InterruptedException ex) {
}
}
}
}
public class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer value = queue.take();
System.out.println("Took " + value);
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
}
}
}