I am implementing application using spring Batch. I am following ItemReader, processor, ItemWriter Approach. I have created Partitioner component which is partitioning Data. Through ItemReader I am reading Data and processing it. After processing I am writing back data in DB. Once job is finished, I observed there is some data missing in DB. Sometimes execution of the one partition fails. Sometimes Job executes successfully. Sometimes I get exceptions. Its random.
- "java.lang.RuntimeException: java.lang.reflect.UndeclaredThrowableException
- is mapped to a primary key column in the database. Updates are not allowed.
- org.eclipse.persistence.exceptions.DatabaseException Internal Exception: java.sql.SQLException: Closed Resultset: getObject
Is there any Thread synchronization or transaction, we need to maintain ?
E.g.
- Total no of records - 1000
- chunk - 100
- Partition1 - 500
- Partition2 - 500
This scenario works fine without using partitioning or using MultiThreaded Step
Sample Code -: This code some times works and commit all data and some times fails.. sometimes I observed few data is not committed in DB (even commit count in BATCH_STEP_EXECUTION table is correct). It is kind of random.
@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
return launcher;
}
@Bean(name = "customerJob")
public Job prepareBatch1() {
return jobBuilderFactory.get("customerJob").incrementer(new RunIdIncrementer()).start(masterStep()).listener(listener())
.build();
}
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep").
partitioner(slaveStep().getName(), partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public BatchListener listener() {
return new BatchListener();
}
@Bean
@JobScope
public BatchPartitioner partitioner() {
return new BatchPartitioner();
}
@Bean
@StepScope
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
taskExecutorPartitionHandler.setGridSize(2);
taskExecutorPartitionHandler.setTaskExecutor(taskExecutor());
taskExecutorPartitionHandler.setStep(slaveStep());
try {
taskExecutorPartitionHandler.afterPropertiesSet();
} catch (Exception e) {
return taskExecutorPartitionHandler;
}
@Bean
@StepScope
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep").<Customer, CustomerWrapperDTO>chunk(100)
.reader(getReader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public BatchWriter writer() {
return new BatchWriter();
}
@Bean
@StepScope
public BatchProcessor processor() {
return new BatchProcessor();
}
@Bean
@StepScope
public BatchReader getReader() {
return new BatchReader();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
class CustomerWrapperDTO {
private Address address;
private Customer customer;
//setter getter address, customer
}
Entity
class Customer {
String processStatus; // "U" : unprocessed, "C" : completed, "F" : Failed
}
public class BatchListener implements JobExecutionListener {
@Autowired
private CustomerRepo customerRepo;
public BatchListener() {
}
@Override
public void beforeJob(JobExecution jobExecution) {
List<Customer> customers;
try {
customers = customerRepo.getAllUnprocessedCustomer);
} catch (Exception e) {
throw new CustomerException("failed in BatchListener", e);
}
jobExecution.getExecutionContext().put("customers",customers);
}
@Override
public void afterJob(JobExecution jobExecution) {
}
}
public class BatchPartitioner implements Partitioner {
@Value("#{jobExecutionContext[customers]}")
private List<Customer> customers;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<>();
int size = customers.size() / gridSize;
List<List<Customer>> lists = IntStream.range(0, customers.size()).boxed()
.collect(Collectors.groupingBy(i -> i / size,
Collectors.mapping(customers::get, Collectors.toList())))
.values().stream().collect(Collectors.toList());
for (int i = 0; i < gridSize; i++) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.putString("name", "Thread_" + i);
executionContext.put("customers", lists.get(i));
result.put("partition" + i, executionContext);
}
return result;
}
}
@Component
@StepScope
Class BatchReader {
private int index;
@Value("#{stepExecutionContext[customers]}")
private List<Customer> customers;
@Override
public Customer read() {
Customer Customer = null;
if (index < customers.size()) {
Customer = customers.get(index);
index++;
} else {
index = 0;
}
return Customer;
}
}
@Component
@StepScope
public class BatchProcessor implements ItemProcessor<Customer, CustomerWrapperDTO> {
public BatchProcessor() {
}
@Override
public BatchProcessor process(Customer item) {
CustomerWrapperDTO customerWrapper = new CustomerWrapperDTO();
try {
// logic to get address
Address address = // API call or some business logic.
item.setAddress(address);
item.setProcessStatus("C"); // Completed
}catch(Exception e) {
item.setProcessStatus("F");// failed
}
//logic to get Address
customerWrapper.setCustomer(item);
return customerWrapper;
}
}
@Component
@StepScope
public class BatchWriter implements ItemWriter<CustomerBatchWrapperDTO> {
@Autowired
private CustmerRepo customerRepo;
@Autowired
private AddressRepo addessRepo;
public BatchWriter() {
}
@Override
public void write(List<? extends CustomerBatchWrapperDTO> items) {
items.forEach(item -> {
try {
if(item.getCustomer() != null) {
customerRepo.merge(item.getCustomer());
}
if(item.getAddress() != null) {
addessRepo.save(item.getAddress());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}