0

I am implementing application using spring Batch. I am following ItemReader, processor, ItemWriter Approach. I have created Partitioner component which is partitioning Data. Through ItemReader I am reading Data and processing it. After processing I am writing back data in DB. Once job is finished, I observed there is some data missing in DB. Sometimes execution of the one partition fails. Sometimes Job executes successfully. Sometimes I get exceptions. Its random.

  • "java.lang.RuntimeException: java.lang.reflect.UndeclaredThrowableException
  • is mapped to a primary key column in the database. Updates are not allowed.
  • org.eclipse.persistence.exceptions.DatabaseException Internal Exception: java.sql.SQLException: Closed Resultset: getObject

Is there any Thread synchronization or transaction, we need to maintain ?

E.g.

  • Total no of records - 1000
  • chunk - 100
  • Partition1 - 500
  • Partition2 - 500

This scenario works fine without using partitioning or using MultiThreaded Step

Sample Code -: This code some times works and commit all data and some times fails.. sometimes I observed few data is not committed in DB (even commit count in BATCH_STEP_EXECUTION table is correct). It is kind of random.

@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(jobRepository);
        return launcher;
    }

  
    @Bean(name = "customerJob")
    public Job prepareBatch1() {
        return jobBuilderFactory.get("customerJob").incrementer(new RunIdIncrementer()).start(masterStep()).listener(listener())
                .build();
    }

    @Bean
    public Step masterStep() {
        return stepBuilderFactory.get("masterStep").
                partitioner(slaveStep().getName(), partitioner())
                .partitionHandler(partitionHandler())
                .build();
    }

    @Bean
    public BatchListener listener() {
        return new BatchListener();
    }

    @Bean
    @JobScope
    public BatchPartitioner partitioner() {
        return new BatchPartitioner();
    }

    @Bean
    @StepScope
    public PartitionHandler partitionHandler() {
        TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
        taskExecutorPartitionHandler.setGridSize(2);
        taskExecutorPartitionHandler.setTaskExecutor(taskExecutor());
        taskExecutorPartitionHandler.setStep(slaveStep());
        try {
            taskExecutorPartitionHandler.afterPropertiesSet();
        } catch (Exception e) {
            
        return taskExecutorPartitionHandler;
    }

    @Bean
    @StepScope
    public Step slaveStep() {
        return stepBuilderFactory.get("slaveStep").<Customer, CustomerWrapperDTO>chunk(100)
                .reader(getReader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public BatchWriter writer() {
        return new BatchWriter();
    }

    @Bean
    @StepScope
    public BatchProcessor processor() {
        return new BatchProcessor();
    }

    @Bean
    @StepScope
    public BatchReader getReader() {
        return new BatchReader();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
        taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }
}
class CustomerWrapperDTO {
    private Address address;
    private Customer customer;
    //setter getter address, customer
    } 

Entity

class Customer {
    String processStatus; // "U" : unprocessed, "C" : completed, "F" : Failed
    }
public class BatchListener implements JobExecutionListener {

    @Autowired
    private CustomerRepo customerRepo;
   
    public BatchListener() {
    }

    @Override
    public void beforeJob(JobExecution jobExecution) {
      
        List<Customer> customers;
        try {
            customers = customerRepo.getAllUnprocessedCustomer);
        } catch (Exception e) {
            throw new CustomerException("failed in BatchListener", e);
        }
        jobExecution.getExecutionContext().put("customers",customers);
        
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        
    }
}
public class BatchPartitioner implements Partitioner {
   
    @Value("#{jobExecutionContext[customers]}")
    private List<Customer> customers;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        
        Map<String, ExecutionContext> result = new HashMap<>();
        int size = customers.size() / gridSize;
        List<List<Customer>> lists = IntStream.range(0, customers.size()).boxed()
                .collect(Collectors.groupingBy(i -> i / size,
                        Collectors.mapping(customers::get, Collectors.toList())))
                .values().stream().collect(Collectors.toList());
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext executionContext = new ExecutionContext();
            executionContext.putString("name", "Thread_" + i);
            executionContext.put("customers", lists.get(i));
            result.put("partition" + i, executionContext);
        }
        
        return result;
    }
}
@Component
@StepScope
Class BatchReader {
        private int index;

        @Value("#{stepExecutionContext[customers]}")
        private List<Customer> customers;


        @Override
        public Customer read() {
            Customer Customer = null;
            if (index < customers.size()) {
                Customer = customers.get(index);
                index++;
            } else {
                index = 0;
            }
            return Customer;
        }
}
@Component
@StepScope
public class BatchProcessor implements ItemProcessor<Customer, CustomerWrapperDTO> {

    public BatchProcessor() {
    }

    @Override
    public BatchProcessor process(Customer item) {
    CustomerWrapperDTO customerWrapper = new CustomerWrapperDTO();
    try {
    
    // logic to get address
    Address address = // API call or some business logic.
    item.setAddress(address);
    item.setProcessStatus("C"); // Completed
    }catch(Exception e) {
    item.setProcessStatus("F");// failed
    }
    //logic to get Address 
    customerWrapper.setCustomer(item);
     return customerWrapper;
      
    }
}
@Component
@StepScope
public class BatchWriter implements ItemWriter<CustomerBatchWrapperDTO> {
 

    @Autowired
    private CustmerRepo customerRepo;
    @Autowired
    private AddressRepo addessRepo;

    public BatchWriter() {
    }

    @Override
    public void write(List<? extends CustomerBatchWrapperDTO> items) {

        items.forEach(item -> {
            try {
                if(item.getCustomer() != null) {
                    customerRepo.merge(item.getCustomer());
                }

                if(item.getAddress() != null) {
                    addessRepo.save(item.getAddress());
                }

            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        
    }
}
vaibhav
  • 1
  • 1
  • Please share your code to be able to help you in an efficient way: https://stackoverflow.com/help/minimal-reproducible-example. – Mahmoud Ben Hassine Sep 19 '22 at 12:53
  • @MahmoudBenHassine I have updated question with sample code.. I hope this will help.. Thank you. This code some time works and commit all data... Some time one of the partition fails.. Some times Data is not consistent in DB. One more observation, instead of reading one by one .. If i read all partition data at once and wrapper in object and pass it to processor, then it works – vaibhav Sep 19 '22 at 13:40
  • Your `BatchProcessor` and `BatchItemWriter` don't need to be scoped. One thing I find curious (and propably dangerous) is that you are storing JPA managed entities in your `ExecutionContext` which gets serialized to DB and distributed over your network. Not sure if that is the wisest thing to do, especially when data grows this might (and will) lead to issues. – M. Deinum Sep 19 '22 at 13:53
  • Fetch size has nothing to do with chunksize, that only concerns the number of records that are fetch when the JDBC resultset needs to obtain more records. – M. Deinum Sep 20 '22 at 12:25

2 Answers2

0

Spring batch is gonna process by chunks. If a chunk fails (this means at least one item failed to process), the transaction is gonna be rolled back.

  • In my scenario, I observed once job is completed.. Data is not consistent in DB. I see some missing records.. Its random. Some times Job is failed due to Db exception.. – vaibhav Sep 19 '22 at 10:23
  • Spring will commit after each chunk is processed. Take a look at [This post](https://stackoverflow.com/questions/58918512/spring-batch-send-notification-after-a-chunk-is-processed) – Leonardo Emmanuel de Azevedo Sep 19 '22 at 10:32
0

The issue is with your item reader:

  • The implementation is not thread-safe, yet it is used in a multi-threaded step. You should synchronize the read method or wrap your reader in a SynchronizedItemStreamReader
  • The execution context is not safe to share between threads, and you seem to be sharing items between threads through the execution context. BTW, storing items in the execution context is not recommended even for single threaded cases, because the context will be persisted (possibly several times) during the job execution.
Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50
  • thank you.. I tried with synchronizing read method.. But it didn't worked.. why we need to synchronize the read method.. I am partitioning data using BatchPartitioner. So each thread will assign to each partition.. – vaibhav Sep 21 '22 at 06:29
  • Because I understood that your worker step does not work as expected when it is multi-threaded: `This scenario works fine without using partitioning or using MultiThreaded Step`. If it still does not work, please share a minimal complete example that reproduces the issue to be able to help you efficiently. – Mahmoud Ben Hassine Sep 21 '22 at 07:24
  • I cannot shared actual code.. I have share sample scenario in question.. – vaibhav Sep 21 '22 at 07:43
  • is there any spring batch component I can debug where we begin and commit Transaction? .. I can share one more scenario.. Partition1 - 10 Partition2 - 10 chunksize - 5 At end of job.. randomly few records are not committing. – vaibhav Sep 23 '22 at 06:46
  • Yes, the tasklet transaction is executed here: https://github.com/spring-projects/spring-batch/blob/main/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java#L255 – Mahmoud Ben Hassine Sep 23 '22 at 07:32
  • one more insights.. My Spring batch schemas are in different Database and business entities are in different Database.. doesn't it make any difference while committing business data ? – vaibhav Sep 23 '22 at 09:54
  • found the issue.. I need to Join my transaction with JTATransaction.. thank you – vaibhav Sep 27 '22 at 12:35