68

I'd like to create some sort of Producer/Consumer threading app. But I'm not sure what the best way to implement a queue between the two.

So I've some up with two ideas (both of which could be entirely wrong). I would like to know which would be better and if they both suck then what would be the best way to implement the queue. It's mainly my implementation of the queue in these examples that I'm concerned about. I'm extending a Queue class that is an in house class and is thread safe. Below are two examples with 4 classes each.

Main class-

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

Consumer class-

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

Producer class-

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

Queue class-

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

OR

Main class-

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

Consumer class-

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

Producer class-

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

Queue class-

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

And go!

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
Gareth
  • 2,180
  • 5
  • 19
  • 24
  • 1
    Producers enqueue and Consumer dequeue, btw. Not vice versa.. – Enno Shioji Feb 25 '10 at 08:22
  • 4
    Oh and don't start Threads from a constructor!! That thread could observe the object in an inconsistent state. Refer to "Java Concurrency in Practice" for details.. – Enno Shioji Feb 25 '10 at 08:24
  • Thanks Zwei, the enqueue thing was me being unfocused. the starting the Thread from constructor thing should I rather run an intiliazation method and start it there or should it start it from the main method class? – Gareth Feb 25 '10 at 09:05

8 Answers8

82

Java 5+ has all the tools you need for this kind of thing. You will want to:

  1. Put all your Producers in one ExecutorService;
  2. Put all your Consumers in another ExecutorService;
  3. If necessary, communicate between the two using a BlockingQueue.

I say "if necessary" for (3) because from my experience it's an unnecessary step. All you do is submit new tasks to the consumer executor service. So:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

So the producers submit directly to consumers.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
cletus
  • 616,129
  • 168
  • 910
  • 942
  • 4
    Cletus is right on the money for more information to help clarify "where to start" http://java.sun.com/docs/books/tutorial/essential/concurrency/ – edwardsmatt Feb 25 '10 at 09:58
  • "So the producers submit directly to consumers" - Is it safe to call consumers.submit(...) in parallel or should I synchronize around that? – Marsellus Wallace Sep 23 '13 at 21:27
  • If you share a BlockingQueue, could you just use 1 executor for both producers and consumers? – devo Oct 31 '13 at 20:33
  • 4
    It's worth noting that if the 'consumer' process can fail in such a way that the data needs to be reprocessed, it is worthwhile to introduce a BlockingQueue so that the consumer in an error state can throw the data back on the queue for other consumers to reprocess. – mooreds Apr 17 '14 at 23:12
20

OK, as others note, the best thing to do is to use java.util.concurrent package. I highly recommend "Java Concurrency in Practice". It's a great book that covers almost everything you need to know.

As for your particular implementation, as I noted in the comments, don't start Threads from Constructors -- it can be unsafe.

Leaving that aside, the second implementation seem better. You don't want to put queues in static fields. You are probably just loosing flexibility for nothing.

If you want to go ahead with your own implementation (for learning purpose I guess?), supply a start() method at least. You should construct the object (you can instantiate the Thread object), and then call start() to start the thread.

Edit: ExecutorService have their own queue so this can be confusing.. Here's something to get you started.

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

Further EDIT: For producer, instead of while(true), you can do something like:

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

This way you can shutdown the executor by calling .shutdownNow(). If you'd use while(true), it won't shutdown.

Also note that the Producer is still vulnerable to RuntimeExceptions (i.e. one RuntimeException will halt the processing)

simeg
  • 1,889
  • 2
  • 26
  • 34
Enno Shioji
  • 26,542
  • 13
  • 70
  • 109
  • So I should add a start() method to the Consumer and Producer? Are you saying I should put something like this in my main method rather? consumer = new Consumer(); consumer.start(readQ); or this? consumer = new Comsumer(readQ); consumer.start(); – Gareth Feb 25 '10 at 09:35
  • 1
    You'd normally do new Comsumer(readQ); consumer.start();. In your case, it is advisable to declare the queue private final, and if you do that, you need to set the queue in the constructor. If this is production code, I strongly advice you to go with cletus' answer. If you absolutely need to use your in-house queue, then you should use ExecutorService executor = Executors.newSingleThreadExecutor() instead of a raw thread instead. This will among other things, protect you from RuntimeException halting you system. – Enno Shioji Feb 25 '10 at 18:41
  • Thanks. very helpful. I've gone with the BlockingQueue like cletus suggested over the in-house queue. Still trying to get my head around the ExecutorService class but when I do I will definitely use it. Thanks for your help. – Gareth Feb 26 '10 at 08:32
15

I have extended cletus proposed answer to working code example.

  1. One ExecutorService (pes) accepts Producer tasks.
  2. One ExecutorService (ces) accepts Consumer tasks.
  3. Both Producer and Consumer shares BlockingQueue.
  4. Multiple Producer tasks generates different numbers.
  5. Any of Consumer tasks can consume number generated by Producer

Code:

import java.util.concurrent.*;

public class ProducerConsumerWithES {
    public static void main(String args[]){
         BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

         ExecutorService pes = Executors.newFixedThreadPool(2);
         ExecutorService ces = Executors.newFixedThreadPool(2);

         pes.submit(new Producer(sharedQueue,1));
         pes.submit(new Producer(sharedQueue,2));
         ces.submit(new Consumer(sharedQueue,1));
         ces.submit(new Consumer(sharedQueue,2));
         // shutdown should happen somewhere along with awaitTermination
         / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
         pes.shutdown();
         ces.shutdown();
    }
}
class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

output:

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1

Note. If you don't need multiple Producers and Consumers, keep single Producer and Consumer. I have added multiple Producers and Consumers to showcase capabilities of BlockingQueue among multiple Producers and Consumers.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
  • This doesn't take care of the race condition when multiple producers and consumers are there. Each see the capacity to be 0 and try to add. With Single Producer and Single consumer no need to synchronize on the BlockingQueue, if it's more than one still Synchronize is required. – Cleonjoys Sep 16 '17 at 17:30
  • You could do one thing, comment out the consumers, then set the Fixed size for the BlockingQueue, you would see yourself. I tried your code with new LinkedBlockingQueue(2); Then Output was as below: Produced:11:by thread:1 Produced:21:by thread:2 Produced:22:by thread:2 Produced:12:by thread:1 How can more values be inserted when the set capacity of the Queue was 2 – Cleonjoys Sep 17 '17 at 18:01
  • That is the nature of BlockingQueue. Unless capacity is available, it will be blocked. I am using Unbounded Blocking Queue and above case does not arise. Even if it arises due to bounded BlockingQueue , it's the way Java implemented it. Check https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html#put-E-. Code snippet in my post does not have any issues. – Ravindra babu Sep 17 '17 at 18:19
  • How do you want to set the race condition check in your code then ? If I set the BlockingQueue to be of size 2 then it shouldn't accept more than 2 elements, but your code accepts more than 2 elements. BlockingQueue gives inherent waits for underflow and overflow, but wont guarantee you the synchronization across multiple threads. – Cleonjoys Sep 17 '17 at 18:33
  • You are terribly wrong. Move debug statement after put : System.out.println("Produced:" + number + ":by thread:"+ threadNo);. Only two elements are inserted. My output now: Produced:11:by thread:1 Produced:21:by thread:2. Blocking Queues are thread safe. – Ravindra babu Sep 17 '17 at 18:48
  • This post does not deserve down-voting since your assumptions are false. Race condition is different from BlockingQueue.wait() when capacity is not available. My BlockingQueue is un-bounded and code does not have any issues. Run this program for 1000 times and you never get race condition. – Ravindra babu Sep 18 '17 at 12:35
7

You are reinventing the wheel.

If you need persistence and other enterprise features use JMS (I'd suggest ActiveMq).

If you need fast in-memory queues use one of the impementations of java's Queue.

If you need to support java 1.4 or earlier, use Doug Lea's excellent concurrent package.

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
flybywire
  • 261,858
  • 191
  • 397
  • 503
  • 7
    You can still be asked to implement Producer Consumer at a job interview :) – Martin Konicek Nov 06 '12 at 16:43
  • I do find the utilities in java.util.concurrent useful, but I find it hard to call it "excellent" while it is still forcing me to pass two parameters just to specify a timeout. Would it have killed Doug to make a class called Duration? – Hakanai Mar 03 '14 at 22:24
2

This is a very simple code.

import java.util.*;

// @author : rootTraveller, June 2017

class ProducerConsumer {
    public static void main(String[] args) throws Exception {
        Queue<Integer> queue = new LinkedList<>();
        Integer buffer = new Integer(10);  //Important buffer or queue size, change as per need.

        Producer producerThread = new Producer(queue, buffer, "PRODUCER");
        Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");

        producerThread.start();  
        consumerThread.start();
    }   
}

class Producer extends Thread {
    private Queue<Integer> queue;
    private int queueSize ;

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super(ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.size() == queueSize){
                    System.out.println(Thread.currentThread().getName() + " FULL         : waiting...\n");
                    try{
                        queue.wait();   //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue empty then produce one, add and notify  
                int randomInt = new Random().nextInt(); 
                System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
                queue.add(randomInt); 
                queue.notifyAll();  //Important
            } //synchronized ends here : NOTE
        }
    }
}

class Consumer extends Thread {
    private Queue<Integer> queue;
    private int queueSize;

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super (ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.isEmpty()){
                    System.out.println(Thread.currentThread().getName() + " Empty        : waiting...\n");
                    try {
                        queue.wait();  //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue not empty then consume one and notify
                System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
                queue.notifyAll();
            } //synchronized ends here : NOTE
        }
    }
}
roottraveller
  • 7,942
  • 7
  • 60
  • 65
1
  1. Java code "BlockingQueue" which has synchronized put and get method.
  2. Java code "Producer" , producer thread to produce data.
  3. Java code "Consumer" , consumer thread to consume the data produced.
  4. Java code "ProducerConsumer_Main", main function to start the producer and consumer thread.

BlockingQueue.java

public class BlockingQueue 
{
    int item;
    boolean available = false;

    public synchronized void put(int value) 
    {
        while (available == true)
        {
            try 
            {
                wait();
            } catch (InterruptedException e) { 
            } 
        }

        item = value;
        available = true;
        notifyAll();
    }

    public synchronized int get()
    {
        while(available == false)
        {
            try
            {
                wait();
            }
            catch(InterruptedException e){
            }
        }

        available = false;
        notifyAll();
        return item;
    }
}

Consumer.java

package com.sukanya.producer_Consumer;

public class Consumer extends Thread
{
    blockingQueue queue;
    private int number;
    Consumer(BlockingQueue queue,int number)
    {
        this.queue = queue;
        this.number = number;
    }

    public void run()
    {
        int value = 0;

        for (int i = 0; i < 10; i++) 
        {
            value = queue.get();
            System.out.println("Consumer #" + this.number+ " got: " + value);
        }
    }
}

ProducerConsumer_Main.java

package com.sukanya.producer_Consumer;

public class ProducerConsumer_Main 
{
    public static void main(String args[])
    {
        BlockingQueue queue = new BlockingQueue();
        Producer producer1 = new Producer(queue,1);
        Consumer consumer1 = new Consumer(queue,1);
        producer1.start();
        consumer1.start();
    }
}
Kasthuri
  • 11
  • 2
0

Use this typesafe pattern with poison pills:

public sealed interface BaseMessage {

    final class ValidMessage<T> implements BaseMessage {

        @Nonnull
        private final T value;


        public ValidMessage(@Nonnull T value) {
            this.value = value;
        }

        @Nonnull
        public T getValue() {
            return value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ValidMessage<?> that = (ValidMessage<?>) o;
            return value.equals(that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "ValidMessage{value=%s}".formatted(value);
        }
    }

    final class PoisonedMessage implements BaseMessage {

        public static final PoisonedMessage INSTANCE = new PoisonedMessage();


        private PoisonedMessage() {
        }

        @Override
        public String toString() {
            return "PoisonedMessage{}";
        }
    }
}

public class Producer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    Producer(@Nonnull BlockingQueue<BaseMessage> messages) {
        this.messages = messages;
    }

    @Override
    public Void call() throws Exception {
        messages.put(new BaseMessage.ValidMessage<>(1));
        messages.put(new BaseMessage.ValidMessage<>(2));
        messages.put(new BaseMessage.ValidMessage<>(3));
        messages.put(BaseMessage.PoisonedMessage.INSTANCE);
        return null;
    }
}

public class Consumer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    private final int maxPoisons;


    public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {
        this.messages = messages;
        this.maxPoisons = maxPoisons;
    }

    @Override
    public Void call() throws Exception {
        int poisonsReceived = 0;
        while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {
            BaseMessage message = messages.take();
            if (message instanceof BaseMessage.ValidMessage<?> vm) {
                Integer value = (Integer) vm.getValue();
                System.out.println(value);
            } else if (message instanceof BaseMessage.PoisonedMessage) {
                ++poisonsReceived;
            } else {
                throw new IllegalArgumentException("Invalid BaseMessage type: " + message);
            }
        }
        return null;
    }
}
Eldar Agalarov
  • 4,849
  • 4
  • 30
  • 38
0
  public class QueueHandler
{
 //winstead of Queue<Object> will replace  BlockingQueue <String> queue = new LinkedBlockingQueue <> ();
public static Queue<Object> readQ = new Queue<Object>(100);

public static void enqueue(Object object)
{
  
    readQ.add(object);
}

public static Object dequeue()
{
   
    return readQ.get();
}
}

When

public static BlockingQueue <String> queue = new LinkedBlockingQueue <> (); 

it is static it works, but when it is non-static it doesn't work properly. How to fix it?

Yoshi
  • 1
  • If you have a new question, please ask it by clicking the [Ask Question](https://stackoverflow.com/questions/ask) button. Include a link to this question if it helps provide context. - [From Review](/review/late-answers/30934736) – Procrastinator Feb 02 '22 at 07:49