I'm trying to model a simple batch process, of reading from a file and dumping to Kafka.
I was trying out the way to restart a step when I manually kill the process, the job parameters includes only a file path parameter, and I am guaranteed that only one job per file will take place.
I'm getting a JobExecutionAlreadyRunningException
when I'm trying to restart a JobExecution
, even though I've updated the status to FAILED
. How would I go about re-starting a job and picking off from the last read record ?
The basic logic being,
- If there's was no executions, start a new execution
- If there's already an execution and the file is found out again, then stop the current execution and restart it
try {
JobParameters params = new JobParametersBuilder()
.addString("input.file.path", filePath)
.toJobParameters();
JobExecution lastJobExecution = jobRepository.getLastJobExecution(ingestJob.getName(), params);
if(lastJobExecution == null) {
jobLauncher.run(ingestJob, params);
return;
}
if(lastJobExecution.isRunning()) {
if(lastJobExecution.getStatus() == BatchStatus.STARTED || lastJobExecution.getStatus() == BatchStatus.STARTING) {
jobOperator.stop(lastJobExecution.getId());
}
lastJobExecution.setStatus(BatchStatus.FAILED);
jobRepository.update(lastJobExecution);
}
// causes error
jobOperator.restart(lastJobExecution.getId());
}
As for the configuration, it's a very simple configuration
@Configuration
@EnableBatchProcessing
public class BatchJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private KafkaProps kafkaProps;
@Bean
public Job ingestFile( Step ingestRecords) {
return jobBuilderFactory.get("ingestFile")
.incrementer(new RunIdIncrementer())
.flow(ingestRecords)
.end()
.build();
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
@Bean
public Step ingestRecords(ItemReader<CustomerRecord> reader
, ItemWriter<CustomerRecord> writer) {
return stepBuilderFactory.get("ingestRecords")
.<CustomerRecord, CustomerRecord> chunk(1000)
.reader(reader)
//.processor(processor)
.writer(writer)
.build();
}
@Bean
@StepScope
public ItemStreamReader<CustomerRecord> reader(@Value("#{jobParameters['input.file.path']}") String filePath) {
return new FlatFileItemReaderBuilder<CustomerRecord>()
.name("customerRecordReader")
.resource(new FileSystemResource(filePath))
.linesToSkip(1)
.delimited()
.names("id", "name", "email", "profession", "type")
.fieldSetMapper(new BeanWrapperFieldSetMapper<CustomerRecord>() {{
setTargetType(CustomerRecord.class);
}})
.build();
}
@Bean
@JobScope
public ItemStreamWriter<CustomerRecord> writer() {
return new KafkaTxWriter(kafkaProps);
}
}