0

Hi I have multithread environment. Its like a producer and subscriber problem. but the challenge is i will have multiple producers and consumers also in multi number. I tried two approaches. Approach1 with BlockingQueue and approach2 with Synchronized List. approach2 is giving me faster results where as i need to get the approach1 faster. I found the problem as well. In approach2 i am waiting for the producer to finish producing first. and started consuming it. as i have a full prepared list, i have freedom to process list in partition and i am processing 2 items at a time. where as in BlockingQueue approach, which is ideal for these problems, i can take one item at a time, hence processing is slow.

approach1 is taking 15310 milli seconds where as approach2 is taking 9755 only i tried calling take method twice in BlockingQueue approach, but realized latch.countDown wont work perfectly with that.

Can some one help me approach1 much better than approach2, i have put time outs based on my real scenarios.

Code i tried here

package testjavathread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;

class Consumer {

    private BlockingQueue<Integer> sharedQueue;

    private int numThreads;

    private ExecutorService executor;

    private CountDownLatch latch;

    public void consume() {
        for (int i = 0; i < numThreads; i++) {
            executor.submit(() -> {
                try {
                    while (true) {
                        Integer integer = sharedQueue.take();
//                      if(!sharedQueue.isEmpty()) {
//                          Integer integer2 = sharedQueue.take();
//                          latch.countDown();
//                      }
                        System.out.println("Consumer " + integer);
                        Thread.sleep(3000);
                        latch.countDown();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
    }

    public ExecutorService getExecutor() {
        return executor;
    }

    public void setExecutor(ExecutorService executor) {
        this.executor = executor;
    }

    public int getNumThreads() {
        return numThreads;
    }

    public void setNumThreads(int numThreads) {
        this.numThreads = numThreads;
    }

    public BlockingQueue<Integer> getSharedQueue() {
        return sharedQueue;
    }

    public void setSharedQueue(BlockingQueue<Integer> sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public void setLatch(CountDownLatch latch) {
        this.latch = latch;
    }
}

public class BlockingQueueTest {

    public static void main(String[] args) {
        BlockingQueueTest testThread = new BlockingQueueTest();

        testThread.test();

    }

    public void test() {
        long startTime = System.currentTimeMillis();

        List<Integer> inputNumbers = new ArrayList<>();
        for (int i = 1; i <= 100; i++) {
            inputNumbers.add(i);
        }

        BlockingQueue<Integer> insertData = new LinkedBlockingQueue<Integer>();
        int threadCount = 20;
        CountDownLatch latch = new CountDownLatch(inputNumbers.size());

        // Consumer started first
        Consumer consumer = new Consumer();
        consumer.setSharedQueue(insertData);
        consumer.setNumThreads(threadCount);
        consumer.setExecutor(Executors.newFixedThreadPool(threadCount));
        consumer.setLatch(latch);
        consumer.consume();

        // Producing the data

        if (!inputNumbers.isEmpty()) {
            final int parallelism = 40;
            ForkJoinPool forkJoinPool = null;
            try {
                forkJoinPool = new ForkJoinPool(parallelism);
                final List<Boolean> output = (List<Boolean>) forkJoinPool
                        .submit(() -> inputNumbers.parallelStream().forEach(inputNumber -> {
                            produce(inputNumber, insertData);
                        })).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (forkJoinPool != null) {
                    forkJoinPool.shutdown();
                }
            }
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println(System.currentTimeMillis() - startTime);

    }

    public void produce(Integer inputNumber, BlockingQueue<Integer> insertData) {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Producer " + inputNumber);
        insertData.add(inputNumber);

    }

}

other approach

package testjavathread;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

import com.google.common.collect.Lists;

public class TestThreadWithList {

    public static void main(String[] args) {

        TestThreadWithList testThread = new TestThreadWithList();

        testThread.test();

    }

    private void test() {
        long startTime = System.currentTimeMillis();

        List<Integer> inputNumbers = new ArrayList<>();
        for (int i = 1; i <= 100; i++) {
            inputNumbers.add(i);
        }

        List<Integer> insertData = Collections.synchronizedList(new ArrayList<>());

        // Producing the data

        if (!inputNumbers.isEmpty()) {
            final int parallelism = 40;
            ForkJoinPool forkJoinPool = null;
            try {
                forkJoinPool = new ForkJoinPool(parallelism);
                final List<Boolean> output = (List<Boolean>) forkJoinPool
                        .submit(() -> inputNumbers.parallelStream().forEach(inputNumber -> {
                            produce(inputNumber, insertData);
                        })).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (forkJoinPool != null) {
                    forkJoinPool.shutdown();
                }
            }
        }

        // Consumer

        final int parallelism = 20;
        ForkJoinPool forkJoinPool = null;
        try {
            forkJoinPool = new ForkJoinPool(parallelism);
            List<Boolean> finalListOutput = (List<Boolean>) forkJoinPool
                    .submit(() -> Lists.partition(insertData, 2).parallelStream().forEach(eachNumber -> {
                        System.out.println("Consumer " + eachNumber.get(0));
                        System.out.println("Consumer " + eachNumber.get(1));

                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    })).get();
        } catch (Exception e) {
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
            }
        }

        System.out.println(System.currentTimeMillis() - startTime);
    }

    private void produce(Integer inputNumber, List<Integer> insertData) {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Producer " + inputNumber);
        insertData.add(inputNumber);

    }

}

user2555212
  • 165
  • 1
  • 14

0 Answers0