I am solving transaction polling case using producer-consumer problem using ExecutorService
and BlockingQueue
. I have a list of transactions, which I want to verify and take action. I am getting a new transaction to verify continuously.
Considering BlockingQueue, I have only one producer and I want to keep 3-5 parallel consumers to speed up the verification.
I might have to wait for a few transactions to get completed. (Say 30 secs). So, I will verify and if it is false and time is greater than 30 sec, I will drop it. Basically, I want to consume only when the data item is consumable.
Firstly, is this approach good? Or I should try some other solutions (which I am not aware of as of now)
Here is the code that I have adapted from this question:
import java.util.concurrent.*;
public class ProducerConsumerWithES {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue,1));
pes.submit(new Producer(sharedQueue,2));
ces.submit(new Consumer(sharedQueue,1));
ces.submit(new Consumer(sharedQueue,2));
// shutdown should happen somewhere along with awaitTermination
/* https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
pes.shutdown();
ces.shutdown();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
I know I can do a peek()
and then remove()
if it is required.
But when I tried doing that, all other consumers get stuck to the same transaction. And the other transactions getting produced are never attended.
This is because of the storage being queue (FIFO).
This scenario never happens when I remove the element, do the verification instead of peeking because other consumers get access to the remaining elements.
My question is, Is doing a peek()
followed by remove()
or put()
at the consumer side is okay?