Hi I have multithread environment. Its like a producer and subscriber problem. but the challenge is i will have multiple producers and consumers also in multi number. I tried two approaches. Approach1 with BlockingQueue and approach2 with Synchronized List. approach2 is giving me faster results where as i need to get the approach1 faster. I found the problem as well. In approach2 i am waiting for the producer to finish producing first. and started consuming it. as i have a full prepared list, i have freedom to process list in partition and i am processing 2 items at a time. where as in BlockingQueue approach, which is ideal for these problems, i can take one item at a time, hence processing is slow.
approach1 is taking 15310 milli seconds where as approach2 is taking 9755 only i tried calling take method twice in BlockingQueue approach, but realized latch.countDown wont work perfectly with that.
Can some one help me approach1 much better than approach2, i have put time outs based on my real scenarios.
Code i tried here
package testjavathread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
class Consumer {
private BlockingQueue<Integer> sharedQueue;
private int numThreads;
private ExecutorService executor;
private CountDownLatch latch;
public void consume() {
for (int i = 0; i < numThreads; i++) {
executor.submit(() -> {
try {
while (true) {
Integer integer = sharedQueue.take();
// if(!sharedQueue.isEmpty()) {
// Integer integer2 = sharedQueue.take();
// latch.countDown();
// }
System.out.println("Consumer " + integer);
Thread.sleep(3000);
latch.countDown();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
public ExecutorService getExecutor() {
return executor;
}
public void setExecutor(ExecutorService executor) {
this.executor = executor;
}
public int getNumThreads() {
return numThreads;
}
public void setNumThreads(int numThreads) {
this.numThreads = numThreads;
}
public BlockingQueue<Integer> getSharedQueue() {
return sharedQueue;
}
public void setSharedQueue(BlockingQueue<Integer> sharedQueue) {
this.sharedQueue = sharedQueue;
}
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
}
public class BlockingQueueTest {
public static void main(String[] args) {
BlockingQueueTest testThread = new BlockingQueueTest();
testThread.test();
}
public void test() {
long startTime = System.currentTimeMillis();
List<Integer> inputNumbers = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
inputNumbers.add(i);
}
BlockingQueue<Integer> insertData = new LinkedBlockingQueue<Integer>();
int threadCount = 20;
CountDownLatch latch = new CountDownLatch(inputNumbers.size());
// Consumer started first
Consumer consumer = new Consumer();
consumer.setSharedQueue(insertData);
consumer.setNumThreads(threadCount);
consumer.setExecutor(Executors.newFixedThreadPool(threadCount));
consumer.setLatch(latch);
consumer.consume();
// Producing the data
if (!inputNumbers.isEmpty()) {
final int parallelism = 40;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Boolean> output = (List<Boolean>) forkJoinPool
.submit(() -> inputNumbers.parallelStream().forEach(inputNumber -> {
produce(inputNumber, insertData);
})).get();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(System.currentTimeMillis() - startTime);
}
public void produce(Integer inputNumber, BlockingQueue<Integer> insertData) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producer " + inputNumber);
insertData.add(inputNumber);
}
}
other approach
package testjavathread;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import com.google.common.collect.Lists;
public class TestThreadWithList {
public static void main(String[] args) {
TestThreadWithList testThread = new TestThreadWithList();
testThread.test();
}
private void test() {
long startTime = System.currentTimeMillis();
List<Integer> inputNumbers = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
inputNumbers.add(i);
}
List<Integer> insertData = Collections.synchronizedList(new ArrayList<>());
// Producing the data
if (!inputNumbers.isEmpty()) {
final int parallelism = 40;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Boolean> output = (List<Boolean>) forkJoinPool
.submit(() -> inputNumbers.parallelStream().forEach(inputNumber -> {
produce(inputNumber, insertData);
})).get();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
// Consumer
final int parallelism = 20;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
List<Boolean> finalListOutput = (List<Boolean>) forkJoinPool
.submit(() -> Lists.partition(insertData, 2).parallelStream().forEach(eachNumber -> {
System.out.println("Consumer " + eachNumber.get(0));
System.out.println("Consumer " + eachNumber.get(1));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
})).get();
} catch (Exception e) {
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
System.out.println(System.currentTimeMillis() - startTime);
}
private void produce(Integer inputNumber, List<Integer> insertData) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Producer " + inputNumber);
insertData.add(inputNumber);
}
}