0

Here is my requirement. In spring batch I need to read from single source(file reader or from DB), process records comes from reader as multiple processers and write each processed items in separate writers.

enter image description here

I have tried with CompositeItemProcessorBuilder<I,O> but it seems not working for my solution.

I have gone through the documentation it has this Chaining ItemProcessors.

my code sample as below.

    @MongoBatchJobApisLoggable
    public Job mongoJob() {
        return jobBuilderFactory.get("mongoJob")
                .start(chunkStep())
                .incrementer(new RunIdIncrementer())
                .listener(new ExecutionTimeJobListener())
                .build();
    }
@Bean
    @MongoBatchJobApisLoggable
    public Step chunkStep() {

        return stepBuilderFactory.get("chunkStep")
                .<MongoReader, CompositeResult>chunk(Integer.parseInt(Objects.requireNonNull(env.getProperty("chunk-size"))))
                .reader(fileReader.itemReader())
                .listener(new ExecutionTimeItemReadListener())
                .processor(fileProcessor.compositeItemProcessor())
                .listener(new ExecutionTimeItemProcessListener())
                .writer(fileWriter.compositeItemWriter())
                .listener(new ExecutionTimeItemWriteListener())
                .taskExecutor(asyncExecutor)
                .listener(new ExecutionTimeTaskletStepListener())
                .listener(new ExecutionTimeStepListener())
                .build();
    }

processer as follows.

    @Bean
    @StepScope
    public ItemProcessor<MongoReader, CompositeResult> compositeItemProcessor() {
        return new CompositeItemProcessorBuilder<MongoReader, CompositeResult>()
                .delegates(Arrays.asList(productItemProcessor(), variantsItemProcessor(), categoriesItemProcessor()))
                .build();
    }
 @Bean
    public ItemProcessor<MongoReader, Product> productItemProcessor() {
        return mongoReader -> Product.builder()
                .id("test")
                .build();
    }

    @Bean
    public ItemProcessor<MongoReader, Variants> variantsItemProcessor() {
        return new ItemProcessor<>() {
            Variants variants;

            @Override
            public Variants process(MongoReader mongoReader) {
                mongoReader.getVariants().forEach(data -> variants = Variants.builder()
                        .id(data.getId())
                        .variantName(data.getVariantName())
                        .build());
                return variants;
            }
        };
    }

    @Bean
    public ItemProcessor<MongoReader, Categories> categoriesItemProcessor() {
        return new ItemProcessor<>() {
            Categories categories;

            @Override
            public Categories process(MongoReader mongoReader) {
                mongoReader.getCategories().forEach(data -> categories = Categories.builder()
                        .id(data.getId())
                        .categoryName(data.getCategoryName())
                        .build());
                return categories;
            }
        };
    }

my writer sample as below.

 @Bean
    @StepScope
    public ItemWriter<CompositeResult> compositeItemWriter() {
        return new CompositeItemWriterBuilder<CompositeResult>()
                .delegates(Arrays.asList(productItemWriter(), variantsItemWriter(), categoriesItemWriter()))
                .build();
    }
  @Bean
    @StepScope
    public ItemWriter<? super CompositeResult> productItemWriter() {
        return list -> list.forEach(data -> System.out.println(data.getProduct().getId()));
    }

    @Bean
    @StepScope
    public ItemWriter<? super CompositeResult> variantsItemWriter() {
        return list -> list.forEach(data -> System.out.println(data.getVariants().getVariantName()));
    }

    @Bean
    @StepScope
    public ItemWriter<? super CompositeResult> categoriesItemWriter() {
        return list -> list.forEach(data -> System.out.println(data.getCategories().getCategoryName()));
    }

CompositeItemProcessor is not processing the read data, getting casting exception when processing. I need to follow the above workflow is there any way to do this?

praneeth
  • 203
  • 1
  • 4
  • 14
  • Since reader is not transactional (a simple file reader as i can tell), and since processing is different for {A,B,C}, a solution would be to write 3 independent jobs reading from same source. I would be definitely be easier to debug such a thing. – Michail Alexakis May 14 '22 at 21:13
  • Thanks for the answer but i need to follow the above mentioned workflow is there anyway to get this done? – praneeth May 15 '22 at 01:54
  • The `CompositeItemProcessor` composes processors in a serial line (a chain): the output of the first is the input of the second and so forth. So, it is not applicable to your case since you need 3 different processors working on the initial input. A solution would be to form a single processor that emits a record of (B,C,D). Then a composite writer, consisting of the 3 aforementioned writers, can be invoked: each member writer handles the relevant part (e.g B) and ignores the others – Michail Alexakis May 15 '22 at 10:03

0 Answers0