0

I am facing a problem in producer - consumer. My requirement is:

Producer produces 100 objects together and wait for consumer to consume. Then Consumer consumes those 100 objects and wait for producer to produce. And this process repeats.

The condition is that, producer should not produce until objects size is 0, and consumer should not consume until objects size is 100. ie . producing and consuming in batches of size 100 only.

class Producer extends Thread {
private Queue<Integer> queue;
private int maxSize;

public Producer(Queue<Integer> queue, int maxSize, String name) {
    super(name);
    this.queue = queue;
    this.maxSize = maxSize;
}

@Override
public void run() {
    while (true) {
        synchronized (queue) {
            while (queue.size() == maxSize) {
                try {
                    System.out.println("Queue is full, "
                            + "Producer thread waiting for "
                            + "consumer to take something from queue");
                    queue.wait();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            Random random = new Random();
            int i = random.nextInt();
            System.out.println("Producing value : " + i);
            queue.add(i);
            queue.notifyAll();
        }
    }
}

}

class Consumer extends Thread {
private Queue<Integer> queue;
private int maxSize;

public Consumer(Queue<Integer> queue, int maxSize, String name) {
    super(name);
    this.queue = queue;
    this.maxSize = maxSize;
}

@Override
public void run() {
    while (true) {
        synchronized (queue) {
            while (queue.isEmpty()) {
                System.out.println("Queue is empty,"
                        + "Consumer thread is waiting"
                        + " for producer thread to put something in queue");
                try {
                    queue.wait();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            System.out.println("Consuming value : " + queue.remove());
            queue.notifyAll();
        }
    }
}

}

public class ProdConsReference {
public static void main(String args[]) { 
    Queue<Integer> buffer = new LinkedList<Integer>(); 
    int maxSize = 10; 
    Thread producer = new Producer(buffer, maxSize, "PRODUCER"); 
    Thread consumer = new Consumer(buffer, maxSize, "CONSUMER"); 
    producer.start(); 
    consumer.start(); 
    }
}

output :

      Queue is empty,Consumer thread is waiting for producer thread to put                             something in queue
      Producing value : 52648529
      Consuming value : 52648529
      Queue is empty,Consumer thread is waiting for producer thread to put something in queue
      Producing value : -2128028718
      Consuming value : -2128028718

Can anybody point out what I am exactly missing. Thanks in advance

Dominic Philip
  • 108
  • 2
  • 10
  • 1
    show us the code where you initialise and start the threads. – Tschallacka Jun 27 '16 at 10:54
  • 1
    `queue.add(i); queue.notifyAll();` You are notifying prematurely. Either in Consumer check queue's length after being notified or notify later (when queue is actually full) - or both. You may also consider using [Conditions](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html). – Fildor Jun 27 '16 at 10:57
  • @MichaelDibbets, I have edited. – Dominic Philip Jun 27 '16 at 10:59
  • 4
    Make it a `Queue>` and send only batches of 100 as a single `List`. And unless you're doing an exercise on concurrency, don't use `LinkedList`. – zapl Jun 27 '16 at 11:06
  • 2
    your requirements, as you desctribed, order consumer and producer to work sequentially. Why did you tagged the question with "multithreading"? – Alexei Kaigorodov Jun 27 '16 at 11:19

2 Answers2

1

I am assuming an excercise, so this is my 2 cents:

You are notifying the other thread after each add/remove. You should not do that.

What you want to do is:

Producer:

  1. Check if Queue is empty.
  2. If yes : Produce 100 items (not 1!) , then notify consumers, jump to 4.
  3. If not: Wait to be notified
  4. Loop to 1. (not 2.!)

Consumer:

  1. Check if Queue is full.
  2. If yes: Consume until Queue is empty , then notify producers. jump to 4.
  3. If not: Wait to be notified
  4. Loop to 1. (not 2.!)

You may want to use Conditions.

If this is not an exercise / assignment

Then you should take a look at zapl's approach, he gave in comment: Use a Queue of Lists which represent batches of 100 items.

Shared: Have work queue (threadsafe Datastructure, I suggest a blocking one).

Producer:

  • Take Batchsize and total of "Tasks" or Items to be processed. I am assuming that total is divisible be batchsize. Otherwise you'll have to take that into account when producing.
  • Produce <batchsize> Items into a List (=batch)
  • Enqueue the batch (List of items) in workQueue
  • Repeat the former 2 steps until total is reached.

Consumer(s):

  • Take batch from workqueue (if/as soon as available)
  • Process all items in batch

Mind that if you must keep the order you can either only use one consumer or take extra effort to enforce sorting of the result.

Fildor
  • 14,510
  • 4
  • 35
  • 67
0

Thnx for your valuable comments and suggestions. I cannot understand why LinkedList usage is discouraged here. In my real time project, my productionCount will be more than millions, and this I have to process in batches of 10000. Below link was also helpful. How to know if other threads have finished?

Here is my implementation using only 2 threads. One Producer & other main thread itself as Consumer

            package threading;

            import java.util.LinkedList;
            import java.util.Queue;
            import java.util.Set;
            import java.util.concurrent.CopyOnWriteArraySet;


            public class ProdConsumerApp implements ThreadCompleteListener{
                boolean isProductionOver;
                public static void main(String args[]) {
                    ProdConsumerApp prodConsumerApp = new ProdConsumerApp();
                    prodConsumerApp.procudeAndConsume();
                }

                private void procudeAndConsume(){
                    Queue<Integer> buffer = new LinkedList<Integer>(); 
                    int maxSize = 100; 
                    int productionCount = 1000;

                    Producer producer = new Producer(buffer, maxSize, "PRODUCER", productionCount); 
                    producer.addListener(this);
                    producer.start(); 
                    consume(buffer);

                    System.out.println("Bye");
                }

                public void consume(Queue<Integer> queue){
                    while(!isProductionOver){//check whether production completed?
                        synchronized (queue) {
                            //when queue size is 0, notify and wait.
                            while(queue.isEmpty()){
                                try {
                                    queue.notify();
                                    queue.wait();
                                } catch (Exception ex) {
                                    ex.printStackTrace();
                                }
                            }
                            //consume until queue is empty.
                            while(!queue.isEmpty()){
                                System.out.println("Consuming value : " + queue.remove());
                            }
                        }
                    }
                }

                @Override
                public void notifyOfThreadComplete(Thread thread) {
                    System.out.println("notified");
                    isProductionOver = true;
                }

            }

            class Producer extends Thread {
                private Queue<Integer> queue;
                private int maxSize;
                private int productionCount;

                public Producer(Queue<Integer> queue, int maxSize, String name, int productionCount) {
                    super(name);
                    this.queue = queue;
                    this.maxSize = maxSize;
                    this.productionCount = productionCount;
                }

                private final Set<ThreadCompleteListener> listeners = new CopyOnWriteArraySet<ThreadCompleteListener>();

                public final void addListener(final ThreadCompleteListener listener) {
                    listeners.add(listener);
                }
                public final void removeListener(final ThreadCompleteListener listener) {
                    listeners.remove(listener);
                }
                private final void notifyListeners() {
                    for (ThreadCompleteListener listener : listeners) {
                        listener.notifyOfThreadComplete(this);
                    }
                }

                @Override
                public void run() {
                    synchronized (queue) {
                        for(int i=1;i<=productionCount;i++){
                            System.out.println("Producing value : " + i);
                            queue.add(i);

                            //when queue size is maxSize, notify and wait.
                            while(queue.size() == maxSize){
                                try {
                                    queue.notify();
                                    if(i==productionCount){
                                        //if last item is produced, notify listeners that production is completed.
                                        notifyListeners();
                                        //exit from while() and later for() and thereby terminating Producer.
                                        break;
                                    }
                                    queue.wait();
                                } catch (Exception ex) {
                                    ex.printStackTrace();
                                }
                            }
                        }           
                    }
                }

            }


            interface ThreadCompleteListener {
                void notifyOfThreadComplete(final Thread thread);
            }
Community
  • 1
  • 1
Dominic Philip
  • 108
  • 2
  • 10
  • 1
    As for the LinkedList: It is not inherently Threadsafe. I guess, that is the reasoning behind that suggestion. You could use datastructures that are more convenient to achieve the requirements. – Fildor Jul 13 '16 at 07:08
  • 1
    Mind that `wait` could return without the expected condition you are waiting for is met. In Producer you loop after wait but you check for queue.size == maxSize. What if it is maxSize/2 ? – Fildor Jul 13 '16 at 07:18
  • Hi Fildor, here both thread have different branches of execution. so no need to worry about concurrency ryt?Moreover we are synchronizing the buffer that is a linkedlist. I think I can go ahead with LinkedList. Is there any problem in the condition queue.size = maxSize? – Dominic Philip Jul 13 '16 at 07:32
  • "Is there any problem in the condition queue.size = maxSize?" Yes. If queue size is > 0 but < maxSize, you go on to add Items. So you'll break your "work in batches" requirement. – Fildor Jul 13 '16 at 08:00
  • Hi Fildor,It seems I am not breaking the requirement. what you pointed is exactly what I need. Producer produces item 1-> queue.size() == maxSize -> no -> Producer produces item 2 -> queue.size() == maxSize -> no ..................................Producer produces item 100->queue.size() == maxSize ->yes->producer notify->producer wait->........consumer consumes all items and notify back..................Are you telling about spurious Thread wake up while waiting? In this case I dont know how to proceed. – Dominic Philip Jul 13 '16 at 09:55
  • Yes I am talking about spurious wakeup. See my updated answer. – Fildor Jul 13 '16 at 10:13