I am trying to create a Spring boot application, which asynchronously process thousand of records using @Async multithreading. For this I am creating 8 threads and 8 sub list from main list so that 8 thread process 8 sub list asynchronously. I am also using @scheduler so that method calls in every 2 seconds.
But problem is that due to scheduler, sometimes this application process duplicate records because this method get called in every 2 seconds and retrieve data from database in every 2 seconds. for example first time method get called and retrieved 72000 records from database whose flag 0 and then @Async method process all these records and change processed records flag from 0 to 1. And again in 2 seconds method get called and retrieve new records wh0se flag is 0.
In log attached pic you can see that first time scheduler get called and retrieved 72000 records and multiple threads started processing in between next scheduler started and retrieved 16000 records which contain those records also which are present in current scheduler.
I am looking solution that next scheduler should not call until first scheduler get completed. Because sometime first scheduler processing records in between if next scheduler call in 2 seconds then may be it retrieve those records again which are already present in first scheduler call.
I can't increase scheduler call time. Because maximum time we get the records around 400-500 and sometime we get records in thousands.
Code as below.
@SpringBootApplication
@EnableScheduling
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public RestTemplate restTemplate(RestTemplateBuilder builder) {
return builder.build();
}
@Bean("ThreadPoolTaskExecutor")
public TaskExecutor getAsyncExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(8);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
executor.initialize();
return executor;
}
}
@Service
public class Main {
@Autowired
private DemoDao dao;
@Autowired
private Asyn asyn;
static int schedulerCount = 0;
@Scheduled(cron = "0/2 * * * * *")
public void schedule() {
System.out.println("++++++++++++++++++++++++++++++++++++Scheduler started schedulerCount : "+schedulerCount+"+++++++++++++++++++++++++++++++++++++"+ LocalDateTime.now());
List<Json> jsonList = new ArrayList<Json>();
List<List<Json>> smallLi = new ArrayList<List<Json>>();
try {
jsonList = dao.getJsonList();
System.out.println("jsonList size : " + jsonList.size());
int count = jsonList.size();
//Creating 8 sublist (8 sublist because thread pool size is 8) from main list(jsonList)
int limit = Math.round(count / 8) + 1;
for (int j = 0; j < count; j += limit) {
smallLi.add(new ArrayList<Json>(jsonList.subList(j, Math.min(count, j + limit))));
}
System.out.println("smallLi : " + smallLi.size());
//After creating 8 sublist, sending sublists with Async method so that 8 threads create and each thread process one sublist asynchronously.
for (int i = 0; i < smallLi.size(); i++) {
asyn.withAsyn(smallLi.get(i), schedulerCount);
}
schedulerCount++;
} catch (Exception e) {
e.printStackTrace();
}
}
@Async("ThreadPoolTaskExecutor")
public void withAsyn(List<Json> li, int schedulerCount) throws Exception {
System.out.println("with start+++++++++++++ schedulerCount " + schedulerCount + ", name : "
+ Thread.currentThread().getName() + ", time : " + LocalDateTime.now() + ", start index : "
+ li.get(0).getId() + ", end index : " + li.get(li.size() - 1).getId());
try {
XSSFWorkbook workbook = new XSSFWorkbook();
XSSFSheet spreadsheet = workbook.createSheet("Data");
XSSFRow row;
for (int i = 0; i < li.size(); i++) {
row = spreadsheet.createRow(i);
Cell cell9 = row.createCell(0);
cell9.setCellValue(li.get(i).getId());
Cell cell = row.createCell(1);
cell.setCellValue(li.get(i).getName());
Cell cell1 = row.createCell(2);
cell1.setCellValue(li.get(i).getPhone());
Cell cell2 = row.createCell(3);
cell2.setCellValue(li.get(i).getEmail());
Cell cell3 = row.createCell(4);
cell3.setCellValue(li.get(i).getAddress());
Cell cell4 = row.createCell(5);
cell4.setCellValue(li.get(i).getPostalZip());
}
FileOutputStream out = new FileOutputStream(new File("C:\\Users\\RK658\\Desktop\\logs\\generated\\"
+ Thread.currentThread().getName() + "_" + schedulerCount + ".xlsx"));
workbook.write(out);
out.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("with end+++++++++++++ schedulerCount " + schedulerCount + ", name : "
+ Thread.currentThread().getName() + ", time : " + LocalDateTime.now() + ", start index : "
+ li.get(0).getId() + ", end index : " + li.get(li.size() - 1).getId());
}