Here is my requirement. In spring batch I need to read from single source(file reader or from DB), process records comes from reader as multiple processers and write each processed items in separate writers.
I have tried with CompositeItemProcessorBuilder<I,O> but it seems not working for my solution.
I have gone through the documentation it has this Chaining ItemProcessors.
my code sample as below.
@MongoBatchJobApisLoggable
public Job mongoJob() {
return jobBuilderFactory.get("mongoJob")
.start(chunkStep())
.incrementer(new RunIdIncrementer())
.listener(new ExecutionTimeJobListener())
.build();
}
@Bean
@MongoBatchJobApisLoggable
public Step chunkStep() {
return stepBuilderFactory.get("chunkStep")
.<MongoReader, CompositeResult>chunk(Integer.parseInt(Objects.requireNonNull(env.getProperty("chunk-size"))))
.reader(fileReader.itemReader())
.listener(new ExecutionTimeItemReadListener())
.processor(fileProcessor.compositeItemProcessor())
.listener(new ExecutionTimeItemProcessListener())
.writer(fileWriter.compositeItemWriter())
.listener(new ExecutionTimeItemWriteListener())
.taskExecutor(asyncExecutor)
.listener(new ExecutionTimeTaskletStepListener())
.listener(new ExecutionTimeStepListener())
.build();
}
processer as follows.
@Bean
@StepScope
public ItemProcessor<MongoReader, CompositeResult> compositeItemProcessor() {
return new CompositeItemProcessorBuilder<MongoReader, CompositeResult>()
.delegates(Arrays.asList(productItemProcessor(), variantsItemProcessor(), categoriesItemProcessor()))
.build();
}
@Bean
public ItemProcessor<MongoReader, Product> productItemProcessor() {
return mongoReader -> Product.builder()
.id("test")
.build();
}
@Bean
public ItemProcessor<MongoReader, Variants> variantsItemProcessor() {
return new ItemProcessor<>() {
Variants variants;
@Override
public Variants process(MongoReader mongoReader) {
mongoReader.getVariants().forEach(data -> variants = Variants.builder()
.id(data.getId())
.variantName(data.getVariantName())
.build());
return variants;
}
};
}
@Bean
public ItemProcessor<MongoReader, Categories> categoriesItemProcessor() {
return new ItemProcessor<>() {
Categories categories;
@Override
public Categories process(MongoReader mongoReader) {
mongoReader.getCategories().forEach(data -> categories = Categories.builder()
.id(data.getId())
.categoryName(data.getCategoryName())
.build());
return categories;
}
};
}
my writer sample as below.
@Bean
@StepScope
public ItemWriter<CompositeResult> compositeItemWriter() {
return new CompositeItemWriterBuilder<CompositeResult>()
.delegates(Arrays.asList(productItemWriter(), variantsItemWriter(), categoriesItemWriter()))
.build();
}
@Bean
@StepScope
public ItemWriter<? super CompositeResult> productItemWriter() {
return list -> list.forEach(data -> System.out.println(data.getProduct().getId()));
}
@Bean
@StepScope
public ItemWriter<? super CompositeResult> variantsItemWriter() {
return list -> list.forEach(data -> System.out.println(data.getVariants().getVariantName()));
}
@Bean
@StepScope
public ItemWriter<? super CompositeResult> categoriesItemWriter() {
return list -> list.forEach(data -> System.out.println(data.getCategories().getCategoryName()));
}
CompositeItemProcessor is not processing the read data, getting casting exception when processing. I need to follow the above workflow is there any way to do this?