I am trying to run some tasks asynchronously with the help of completableFuture by extracting values from the concurrentLinkedQueue. Below is the implementation for running tasks asynchronously with completable future.
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
jobMsisdn tempMsisdn;
while (true) {
if (!msisdnQueue.isEmpty()) {
tempMsisdn = msisdnQueue.poll();
logger.info(tempMsisdn.toString());
completableFuture.runAsync(new Worker(tempMsisdn), executor);
} else {
Thread.sleep(5000);
}
}
Queue is static and initialized inside main. I am populating queue from an endpoint by a user request.
String tempId = req.getRefId();
jobMsisdn tempReq = new jobMsisdn();
CallDialerApplication.jobsMap.put(tempId, req);
System.out.println(req.toString());
if(CallDialerApplication.jobs.isEmpty()){
CallDialerApplication.jobs.add(req);
BufferedReader br = new BufferedReader(new FileReader(new File(req.getMsisdnPath())));
String line;
try {
while ((line = br.readLine()) != null) {
tempReq.setJobRefId(tempId);
tempReq.setMsisdn(line);
logger.info(line);
req.msisdns.put(line, CallState.insideQueue);
CallDialerApplication.msisdnQueue.add(tempReq);
logger.info(tempReq.toString());
count++;
}
}
finally {
br.close();
}
logger.info("Request Added to Queue with jobID: " + req.getRefId());
}
Now, the issue is that only the last value inserting to the queue is repeated inside the queue when I poll from the queue. I don't get it what is causing this issue. Is it the thread-safe mechanism of the ConcurrentLinkedQueue or something else?