0

I'm trying to model a simple batch process, of reading from a file and dumping to Kafka.
I was trying out the way to restart a step when I manually kill the process, the job parameters includes only a file path parameter, and I am guaranteed that only one job per file will take place.

I'm getting a JobExecutionAlreadyRunningException when I'm trying to restart a JobExecution, even though I've updated the status to FAILED. How would I go about re-starting a job and picking off from the last read record ?

The basic logic being,

  1. If there's was no executions, start a new execution
  2. If there's already an execution and the file is found out again, then stop the current execution and restart it
try {
       JobParameters params = new JobParametersBuilder()
                 .addString("input.file.path", filePath)
                 .toJobParameters();

       JobExecution lastJobExecution = jobRepository.getLastJobExecution(ingestJob.getName(), params);
       if(lastJobExecution == null) {
               jobLauncher.run(ingestJob, params);
               return;
       }

       if(lastJobExecution.isRunning()) {
            if(lastJobExecution.getStatus() == BatchStatus.STARTED || lastJobExecution.getStatus() == BatchStatus.STARTING) {
                 jobOperator.stop(lastJobExecution.getId());
            }
            lastJobExecution.setStatus(BatchStatus.FAILED);
            jobRepository.update(lastJobExecution);
       }

       // causes error
       jobOperator.restart(lastJobExecution.getId());

    }

As for the configuration, it's a very simple configuration

@Configuration
@EnableBatchProcessing
public class BatchJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private KafkaProps kafkaProps;

    @Bean
    public Job ingestFile( Step ingestRecords) {
        return jobBuilderFactory.get("ingestFile")
                .incrementer(new RunIdIncrementer())
                .flow(ingestRecords)
                .end()
                .build();
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry);
        return postProcessor;
    }

    @Bean
    public Step ingestRecords(ItemReader<CustomerRecord> reader
            , ItemWriter<CustomerRecord> writer) {
        return stepBuilderFactory.get("ingestRecords")
                .<CustomerRecord, CustomerRecord> chunk(1000)
                .reader(reader)
                //.processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    @StepScope
    public ItemStreamReader<CustomerRecord> reader(@Value("#{jobParameters['input.file.path']}") String filePath) {
        return new FlatFileItemReaderBuilder<CustomerRecord>()
                .name("customerRecordReader")
                .resource(new FileSystemResource(filePath))
                .linesToSkip(1)
                .delimited()
                .names("id", "name", "email", "profession", "type")
                .fieldSetMapper(new BeanWrapperFieldSetMapper<CustomerRecord>() {{
                    setTargetType(CustomerRecord.class);
                }})
                .build();
    }

    @Bean
    @JobScope
    public ItemStreamWriter<CustomerRecord> writer() {
        return new KafkaTxWriter(kafkaProps);
    }

}
Prasanth Ravi
  • 145
  • 1
  • 11
  • 1
    Setting the status of JobExecution to FAILED is not enough. You need to set the end time to a non null value as well (and do the same for the BATCH_STEP_EXECUTION table, see duplicate question). – Mahmoud Ben Hassine Nov 30 '20 at 09:29
  • @MahmoudBenHassine Yes, it does. Thanks. I am able to resume the job. As an aside, I couldn't find this in the documentation (unless I missed it), might be good to include in official docs? – Prasanth Ravi Nov 30 '20 at 20:56
  • 1
    Yes, the docs mention to change the status to FAILED here: https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/job.html#aborting-a-job, but does not mention the detail about setting the end time to a non null value. Please open an issue and we will add it to the docs. – Mahmoud Ben Hassine Dec 01 '20 at 08:59
  • 1
    @MahmoudBenHassine, done. – Prasanth Ravi Dec 02 '20 at 05:39

0 Answers0