You acquire and release ReentrantLock
with lock()
and unlock()
, not with synchronized
.
As Antoniossss points out, this is a deadlock where one thread is waiting for a lock which the other will never relinquish (while the two are trying to coordinate on that lock). Here is one solution:
import static java.util.Objects.requireNonNull;
import java.util.LinkedList;
import java.util.List;
class Producer implements Runnable {
private final List<String> data;
Producer(List<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
int counter = 0;
while (true) {
synchronized (data) {
if (data.size() < 5) {
counter++;
data.add("writing:: " + counter);
} else {
try {
data.wait();
} catch (InterruptedException e) {
return;
}
}
}
}
}
}
class Consumer implements Runnable {
private final List<String> data;
Consumer(List<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
while (true) {
synchronized (data) {
if (data.size() > 0) {
System.out.println("reading:: " + data.get(data.size() - 1));
data.remove(data.size() - 1);
}
data.notify();
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
List<String> data = new LinkedList<>();
Thread t1 = new Thread(new Producer(data));
Thread t2 = new Thread(new Consumer(data));
t1.start();
t2.start();
}
}
We have done away with the lock object and synchronize on the list itself. The producer synchronizes inside the while loop, which has the effect that once there are at least 5 items in the list, the producer will wait, ceding the monitor on the list.
The consumer also synchronizes inside the loop which is critical because in your code, it never relinquished the monitor on the lock once it had acquired it. In fact, if you had started the consumer first (or gotten very unlucky), nothing at all would have been produced. A monitor is released when leaving a synchronized block or method or when a thread holding the monitor wait()
s on it, but notify()
and notifyAll()
do not release the monitor.
The consumer reads the last item if there are any, and then immediately notifies the producer and releases the lock. Two notes:
First, it isn't clear what ordering you expect for items. Do you expect the producer to produce 5, the consumer to consume 5, and so on, or do you want the 5 to simply be a limit so too large a backlog cannot form (this is good, it's called backpressure), but the consumer to consume items eagerly whenever they're available? This implementation does the latter.
Second, the consumer tries to acquire the monitor on the list as soon as it has released it. This is a form of busy waiting, the consumer and producer are then racing to acquire the lock, and it may be that the consumer often wins that race, which will become pointless once the list is empty. It might be wise to place a call to onSpinWait
outside the synchronized block but inside the while loop in the consumer, in Java 9 or later. In earlier versions, yield
might be appropriate. But in my tests the code does fine without either.
Antoniossss made another suggestion, to use a LinkedBlockingQueue, but the code as it stands always takes the last item, and using a queue would change that behavior. Instead, we can use a deque (a double-ended queue), putting items on the end and also taking them from the end. Here's what that looks like:
import static java.util.Objects.requireNonNull;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
class Producer implements Runnable {
private final BlockingDeque<String> data;
Producer(BlockingDeque<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
int counter = 0;
while (true) {
counter++;
try {
data.put("writing:: " + counter);
} catch (InterruptedException e) {
break;
}
}
}
}
class Consumer implements Runnable {
private final BlockingDeque<String> data;
Consumer(BlockingDeque<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
while (true) {
try {
System.out.println("reading:: " + data.takeLast());
} catch (InterruptedException e) {
break;
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
BlockingDeque<String> data = new LinkedBlockingDeque<>(5);
Thread t1 = new Thread(new Producer(data));
Thread t2 = new Thread(new Consumer(data));
t1.start();
t2.start();
}
}
Because a LinkedBlockingDeque
is a concurrent data structure we don't need any synchronized blocks or wait or notify here. We can simply try to put
and takeLast
from the deque, and it will block if the deque is full or empty, respectively. The deque is created with a capacity of 5, so it applies backpressure to the producer if the producer ever gets that far ahead, just like the original.
There's nothing to stop the producer from producing elements as fast as the consumer can consume them, though, which means the first elements may have to wait an arbitrarily long time to be consumed. It isn't clear to me if that was the intention of your code. There are ways you could achieve that, either by introducing wait()
and notify()
again, by using Semaphore
s, or other means, but I'll leave that be since it isn't clear you even wanted it.
One final note on InterruptedException
. It will happen if someone calls interrupt()
on the thread, but the only one holding onto references to the producer and consumer threads is the main()
method, and it never interrupts them. So the exception shouldn't occur here, but in case it somehow does, I simply have the producer or consumer exit. In a more complex scenario interrupting the thread could be used as a way to signal it if it's sleeping or in a blocking method (or even outside of one, if it checks for it explicitly) but we're not using that here.