-1

how can I create a loop (or something else if that is a better way) where I can create some new threads. So far I have 2 producer and consumer threads. But I would like to create, for example, 5 producers and 5 consumers, and each thread produced / consumed a different "product", two threads cannot do the same.

I'd like it to be something like this:

Produced thread0  produce 0 
Consume thread0  consume 0
....
Produced  thread4 produce 4
Consume  thread4  consume 4

Thank you for every hint.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ProducerConsumer {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);

        Thread producerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    int value = 0;
                    while (true) {
                        blockingQueue.put(value);

                        System.out.println("Produced " + value);

                        value++;

                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while (true) {
                        int value = blockingQueue.take();

                        System.out.println("Consume " + value);

                        Thread.sleep(1000);
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        producerThread.start();
        consumerThread.start();
        producerThread.join();
        consumerThread.join();
    }
}

1 Answers1

1
  • Use a thread pool (Executors.newFixedThreadpool or Executors.newCachedThreadPool)
  • Don't forget to manage thread synchronization for resources using synchronized blocks.
  • Use volatile keyword for values that will be written/read simutaneously by several threads (see What is the volatile keyword useful for?)
  • I've used lambda syntax to redefine your runnables for clarity.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ProducerConsumer {

    private static volatile int prodValue = 0;
    private static final Object valueSync = new Object();

    public static void main(String[] args) throws InterruptedException {
        final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);

        final ExecutorService threadPool = Executors.newCachedThreadPool();

        final Runnable producer = () -> {
            try {
                while (true) {
                    synchronized(valueSync) {
                        blockingQueue.put(prodValue);
                        System.out.println(Thread.currentThread().getId() + " Produced " + prodValue);
                        prodValue++;
                    }
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        final Runnable consumer = () -> {
            try {
                while (true) {
                    int value = blockingQueue.take();
                    System.out.println(Thread.currentThread().getId() + " Consumed " + value);
                    Thread.sleep(1200);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        for (int i = 0; i < 5; i++) { //Create 5 threads of each
            System.out.println("Loop " + i);
            threadPool.execute(producer);
            threadPool.execute(consumer);
            Thread.sleep(500); //Wait a little
        }
        System.out.println("Loop done");
        //Wait for all threads to complete with timeout
        threadPool.awaitTermination(15, TimeUnit.SECONDS);
        System.out.println("STOP !");
        //Forceful shutdown of all threads (will happen as all threads are in a while(true) loop
        threadPool.shutdownNow();
    }
}

About synchronization: here you want your value to be added to the queue and incremented seemingly at once (atomically). synchronized around the operations prevents threads from simultaneously running this piece of code, which would result in the same value added multiple times into the queue, and then incremented multiple times (it happens if you decrease the Thread.sleep values to something close to 0 and remove the synchronized block).

I could have used blockingQueue as argument for synchronized but chose to use a dedicated object to make it more obvious.

julien.giband
  • 2,467
  • 1
  • 12
  • 19