1

I am trying to implement consumer-producer using a ReentrantLock as given below:

class Producer implements Runnable {

    private List<String> data;
    private ReentrantLock lock;

    Producer(List<String> data,ReentrantLock lock)
    {
        this.data = data;
        this.lock = lock;
    }

    @Override
    public void run() {
        int counter = 0;
        synchronized (lock)
        {
            while (true)
            {

                if ( data.size() < 5)
                {
                    counter++;
                    data.add("writing:: "+counter);
                }
                else
                {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    }
}

class Consumer implements Runnable{

    private List<String> data;
    private ReentrantLock lock;

    Consumer(List<String> data,ReentrantLock lock)
    {
        this.data = data;
        this.lock = lock;
    }

    @Override
    public void run() {
        int counter = 0;
        synchronized (lock)
        {
            while (true)
            {

                if ( data.size() > 0)
                {
                    System.out.println("reading:: "+data.get(data.size()-1));
                    data.remove(data.size()-1);
                }
                else
                {
                    System.out.println("Notifying..");
                    lock.notify();
                }
            }
        }

    }
}

public class ProducerConsumer  {

    public static void main(String[] args) {
        List<String> str = new LinkedList<>();
        ReentrantLock lock= new ReentrantLock();
        Thread t1 = new Thread(new Producer(str,lock));
        Thread t2 = new Thread(new Consumer(str,lock));
        t1.start();
        t2.start();
    }
}

So, it writes only once to the list and then Consumer waits indefinitely. Why is this happening? Why not Producer is acquiring the lock?

Andrew Tobilko
  • 48,120
  • 14
  • 91
  • 142
learner
  • 4,614
  • 7
  • 54
  • 98
  • Have you read a tutorial for this? For example, https://docs.oracle.com/javase/tutorial/essential/concurrency/newlocks.html – Radiodef Jul 05 '18 at 18:24
  • After reading the link, I think I should try to use try `{lock.lock()}` and finally `{lock.unlock()}` for the same? But, I am a little bit confused to use the same. – learner Jul 05 '18 at 18:30

4 Answers4

3

@Antoniossss is right. You're not using ReentrantLock correctly and instead you can just replace this with an Object. If you want to use ReentrantLock instead (which is more current) then I would suggest something like:

package Multithreading;


import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

class Producer implements Runnable{

  private List<String> data;
  private ReentrantLock lock;

  Producer(List<String> data,ReentrantLock lock)
  {
    this.data = data;
    this.lock = lock;
  }

  @Override
  public void run() {
    int counter = 0;
    while (true)
    {


        try {
          lock.lock();
          if ( data.size() < 5)
          {
            counter++;
            data.add("writing:: " + counter);
          }
        }finally {
          lock.unlock();
        }

      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

}

class Consumer implements Runnable{

  private List<String> data;
  private ReentrantLock lock;

  Consumer(List<String> data,ReentrantLock lock)
  {
    this.data = data;
    this.lock = lock;
  }

  @Override
  public void run() {
    int counter = 0;
    while (true)
    { 
      try {
        lock.lock();
          if ( data.size() > 0) 
          {
            System.out.println("reading:: "+data.get(data.size()-1));
            data.remove(data.size()-1);
          }
       }finally {
         lock.unlock();
       }
       try 
       {
          TimeUnit.SECONDS.sleep(1);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
      }
    }
  }
}

public class ProducerConsumer  {

  public static void main(String[] args) {
    List<String> str = new LinkedList<>();
    ReentrantLock lock= new ReentrantLock();
    Thread t1 = new Thread(new Producer(str,lock));
    Thread t2 = new Thread(new Consumer(str,lock));
    t1.start();
    t2.start();
  }
}

I removed the notify calls but if you really need to let in one thread at a time just use lock.notify()

Simeon G
  • 1,188
  • 8
  • 20
1

You acquire and release ReentrantLock with lock() and unlock(), not with synchronized.

As Antoniossss points out, this is a deadlock where one thread is waiting for a lock which the other will never relinquish (while the two are trying to coordinate on that lock). Here is one solution:

import static java.util.Objects.requireNonNull;

import java.util.LinkedList;
import java.util.List;

class Producer implements Runnable {
    private final List<String> data;

    Producer(List<String> data) {
        this.data = requireNonNull(data);
    }

    @Override
    public void run() {
        int counter = 0;
        while (true) {
            synchronized (data) {
                if (data.size() < 5) {
                    counter++;
                    data.add("writing:: " + counter);
                } else {
                    try {
                        data.wait();
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }
        }
    }
}

class Consumer implements Runnable {
    private final List<String> data;

    Consumer(List<String> data) {
        this.data = requireNonNull(data);
    }

    @Override
    public void run() {
        while (true) {
            synchronized (data) {
                if (data.size() > 0) {
                    System.out.println("reading:: " + data.get(data.size() - 1));
                    data.remove(data.size() - 1);
                }
                data.notify();
            }
        }
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        List<String> data = new LinkedList<>();
        Thread t1 = new Thread(new Producer(data));
        Thread t2 = new Thread(new Consumer(data));
        t1.start();
        t2.start();
    }
}

We have done away with the lock object and synchronize on the list itself. The producer synchronizes inside the while loop, which has the effect that once there are at least 5 items in the list, the producer will wait, ceding the monitor on the list.

The consumer also synchronizes inside the loop which is critical because in your code, it never relinquished the monitor on the lock once it had acquired it. In fact, if you had started the consumer first (or gotten very unlucky), nothing at all would have been produced. A monitor is released when leaving a synchronized block or method or when a thread holding the monitor wait()s on it, but notify() and notifyAll() do not release the monitor.

The consumer reads the last item if there are any, and then immediately notifies the producer and releases the lock. Two notes:

First, it isn't clear what ordering you expect for items. Do you expect the producer to produce 5, the consumer to consume 5, and so on, or do you want the 5 to simply be a limit so too large a backlog cannot form (this is good, it's called backpressure), but the consumer to consume items eagerly whenever they're available? This implementation does the latter.

Second, the consumer tries to acquire the monitor on the list as soon as it has released it. This is a form of busy waiting, the consumer and producer are then racing to acquire the lock, and it may be that the consumer often wins that race, which will become pointless once the list is empty. It might be wise to place a call to onSpinWait outside the synchronized block but inside the while loop in the consumer, in Java 9 or later. In earlier versions, yield might be appropriate. But in my tests the code does fine without either.

Antoniossss made another suggestion, to use a LinkedBlockingQueue, but the code as it stands always takes the last item, and using a queue would change that behavior. Instead, we can use a deque (a double-ended queue), putting items on the end and also taking them from the end. Here's what that looks like:

import static java.util.Objects.requireNonNull;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

class Producer implements Runnable {
    private final BlockingDeque<String> data;

    Producer(BlockingDeque<String> data) {
        this.data = requireNonNull(data);
    }

    @Override
    public void run() {
        int counter = 0;
        while (true) {
            counter++;
            try {
                data.put("writing:: " + counter);
            } catch (InterruptedException e) {
                break;
            }
        }
    }
}

class Consumer implements Runnable {
    private final BlockingDeque<String> data;

    Consumer(BlockingDeque<String> data) {
        this.data = requireNonNull(data);
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("reading:: " + data.takeLast());
            } catch (InterruptedException e) {
                break;
            }
        }
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        BlockingDeque<String> data = new LinkedBlockingDeque<>(5);
        Thread t1 = new Thread(new Producer(data));
        Thread t2 = new Thread(new Consumer(data));
        t1.start();
        t2.start();
    }
}

Because a LinkedBlockingDeque is a concurrent data structure we don't need any synchronized blocks or wait or notify here. We can simply try to put and takeLast from the deque, and it will block if the deque is full or empty, respectively. The deque is created with a capacity of 5, so it applies backpressure to the producer if the producer ever gets that far ahead, just like the original.

There's nothing to stop the producer from producing elements as fast as the consumer can consume them, though, which means the first elements may have to wait an arbitrarily long time to be consumed. It isn't clear to me if that was the intention of your code. There are ways you could achieve that, either by introducing wait() and notify() again, by using Semaphores, or other means, but I'll leave that be since it isn't clear you even wanted it.

One final note on InterruptedException. It will happen if someone calls interrupt() on the thread, but the only one holding onto references to the producer and consumer threads is the main() method, and it never interrupts them. So the exception shouldn't occur here, but in case it somehow does, I simply have the producer or consumer exit. In a more complex scenario interrupting the thread could be used as a way to signal it if it's sleeping or in a blocking method (or even outside of one, if it checks for it explicitly) but we're not using that here.

David Conrad
  • 15,432
  • 2
  • 42
  • 54
  • 2
    This is not wrong but, alone, it's useless and does not address the issue in their code. How do they do cross thread communication with `ReentrantLock`, what's the equivalent of their calls to `wait` and `notify`? And how do they do it correctly? – Sotirios Delimanolis Jul 05 '18 at 18:29
  • @SotiriosDelimanolis I think if an answer must provide a working replacement then the question should simply be closed as *too broad*; otherwise, the asker should fix the problems and ask another question. But I will expand on my answer. – David Conrad Jul 06 '18 at 16:10
1

As you want simply Producer to non-stop produce values, and Consumer to non-stop consume those values and eventually wait for values to be broduces since there is none, skip while synchronization and locking and use LinkedBlockingQueue

In produce you simply:

queue.put(value)

And in consumer you do

value=queue.take();

voila, consumer will take values it there is any and will wait for value queue is empty.

As for your code:

  1. You are not using ReentrantLock at all. You can replace it with new Object and the output will be the same. wait and notify are Object's methods.
  2. The reason why you only manage to produce single value lies in your consumer. Effectively you have something like this:

    synchronized(lock){ // aquire lock's monitor
    
    while(true){
    
      lock.notify();
     }
    
    } // release lock's monitor - never doing that, thread never leaves this block
    

The problem here is that you execution never leaves synchronized block. So you are calling notify but you are not releasing lock monitor by exiting synchronized block. Producer is "notified" but in order to continue execution it must reaquire lock's monitor - but it can's since consumer never releases it. Almost classic deadlock here.

You can imagine that there are couple of ppl in the room and only one that has the stick can talk. So first one gets the stick with syncronized(stick). Do what it has to do and decided that it must wait for someone else so he calls wait and passes the stick back. Now second person can talk, do hes job and decides that the one who gave him the stick can continue now. He calls notify - now he must pass the stick by leaving synchronized(stick) block. If he don't, first person cannot proceed - this is what happens in your case.

Antoniossss
  • 31,590
  • 6
  • 57
  • 99
  • 1
    This is not a deadlock, I don't know what _almost_ means. Your suggestion to use `LinkedBlockingQueue` defeats the purpose of their code. Your explanation of the behavior is fine but doesn't address how to correct it. I don't think the answer is useful. Why does it always have to be _vandalism_ or _abuse_? – Sotirios Delimanolis Jul 05 '18 at 19:02
1

I want to point out two mistakes you have made:

  1. The wrong usage of ReentrantLock.

Every object can be used for an intrinsic lock, so there is no need to find a specific Lock class. Since each synchronized block is bounded by a single method and you don't demand any enhanced means, ReentrantLock is redundant here.

  1. The incorrect position of the synchronized blocks.

Once you enter a synchronized block, no one can enter there until you leave it. Obviously, you will never exit it because of while(true).

I would suggest you remove the ReentrantLocks and do synchronisation on data.

@Override
public void run() {
    int counter = 0;
    while (true) {
        synchronized (data) {
            if (data.size() < 5) {
                data.add("writing:: " + ++counter);
            } else {
                try {
                    data.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        // we are out of the synchornized block here
        // to let others use data as a monitor somewhere else
    }
}
Andrew Tobilko
  • 48,120
  • 14
  • 91
  • 142