0

I am trying to make a Spring batch job in which
Reader reads one Item from Database
Processor creates List of Items
UnpackingItmeWriter accepts List of Items and sends individual Item to FlatFileItemWriter.
following is my code. I got this from this Answer .I am not doing something right in this configuration because job is not even starting when i run it. Please point me in right direction.

    @EnableBatchProcessing
    public class CreateFile extends ApplicationCommonConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public DataSource dataSource;

    @Bean
    @StepScope
    public JdbcCursorItemReader<ItemA> reader(
            @Value("#{jobParameters}") Map<String, JobParameter> jobParameters) {
        JdbcCursorItemReader<ItemA> reader = new JdbcCursorItemReader<ItemA>();

        final String QUERY_SELECT = "SELECT * from table"
        reader.setDataSource(dataSource);
        reader.setSql(QUERY_SELECT);
        reader.setRowMapper(new ItemeDeliveryRowMapper());

        return reader;
    }

    public class ItemRowMapper implements RowMapper<ItemA> {

        @Override
        public ItemAmapRow(ResultSet rs, int rowNum) throws SQLException {

            ItemA item= new ItemA();

            item.setCode(rs.getString("Code"));
            return item;
        }

    }



    @Bean
    @StepScope
    public ItemProcessor<ItemA, List<ItemA>> processor(
            @Value("#{jobParameters}") Map<String, JobParameter> jobParameters, @Value("#{stepExecution}")

            StepExecution stepExecution) {

        return new ItemAProcessor();
    }

    public class ItemAProcessor
            implements ItemProcessor<ItemA, List<ItemA>> {

        @Override
        public List<ItemA> process(ItemA item) throws Exception {
            List<Item> itemList = new ArrayList<ItemA>();
            List<String> IdList = new ArrayList<>();

        // creating List of ItemA from one ItemA

            return itemList;
        }
    }


        public class ListUnpackingItemWriter<ItemA> implements ItemWriter<List<ItemA>>, ItemStream, InitializingBean {

        private ItemWriter<ItemA> delegate;

        @Override
        public void write(final List<? extends List<ItemA>> lists) throws Exception {
             List<ItemA> consolidatedList = new ArrayList<>();
            for ( List<ItemA> list : lists) {
                consolidatedList.addAll(list);
            }
            delegate.write(consolidatedList);
        }

        @Override
        public void afterPropertiesSet() {
            Assert.notNull(delegate, "You must set a delegate!");
        }

        @Override
        public void open(ExecutionContext executionContext) throws ItemStreamException {
            if (delegate instanceof ItemStream) {
                ((ItemStream) delegate).open(executionContext);
            }
        }

        @Override
        public void update(ExecutionContext executionContext) throws ItemStreamException{
            if (delegate instanceof ItemStream) {
                ((ItemStream) delegate).update(executionContext);
            }
        }

        @Override
        public void close() {
            if (delegate instanceof ItemStream) {
                ((ItemStream) delegate).close();
            }
        }

        public void setDelegate(ItemWriter<ItemA> delegate) {
            this.delegate = delegate;
        }


    }


    @Bean
    @StepScope
    public FlatFileItemWriter<ItemA> flatWriter() {

        FlatFileItemWriter<ItemA> writer = new FlatFileItemWriter<ItemA>();

        writer.setResource(new FileSystemResource("C:/dev/doc.txt"));

        writer.setHeaderCallback(new FlatFileHeaderCallback() {

            @Override
            public void writeHeader(Writer writer) throws IOException {

                writer.write("Header");

            }
        });

        writer.setLineAggregator(new DelimitedLineAggregator<Item>() {
            {
                setDelimiter("|");
                setFieldExtractor(new BeanWrapperFieldExtractor<Item>() {
                    {
                        setNames(new String[] { "Code" });
                    }
                });
            }
        });

        writer.setFooterCallback(new FlatFileFooterCallback() {

            @Override
            public void writeFooter(Writer writer) throws IOException {
                writer.write("Footer");

            }
        });

        return writer;

    }



    @Bean
    @StepScope
    public ItemWriter<List<ItemA>> writer(@Value("#{jobParameters}") Map<String, JobParameter> jobParameters) {
        ListUnpackingItemWriter<ItemA> listUnpackingItemWriter = new ListUnpackingItemWriter<ItemA>();
        listUnpackingItemWriter.setDelegate(flatWriter());
        return listUnpackingItemWriter;
    }

    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<ItemA> reader,
            ItemProcessor<ItemA, List<ItemA>> processor,
            ItemWriter<List<ItemA>> flatItemWriter) {
        return stepBuilderFactory.get("step1").<ItemA, List<ItemA>>chunk(1).reader(reader)
                .processor(processor).writer(flatItemWriter).build();
    }

    @Bean
    public Job createFileJob(JobBuilderFactory jobs, Step s1) {

        return jobBuilderFactory.get("createFileJob").incrementer(new RunIdIncrementer())
                .listener(listener()).flow(s1).end().build();
    }



}

Edit: printstack

2018-11-11 22:17:53 [main] INFO  c.t.t.l.Launcher - inside try
2018-11-11 22:17:54 [main] INFO  c.t.t.c.CreateFile - inside JdbcCursorItemReader
2018-11-11 22:17:55 [main] ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step step1 in job createJob
org.springframework.batch.item.WriterNotOpenException: Writer must be open before it can be written to
    at org.springframework.batch.item.file.FlatFileItemWriter.write(FlatFileItemWriter.java:255) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at com.stackoverflow.spring.config.CreateFile$ListUnpackingItemWriter.write(CreateFile.java:209) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_181]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133) ~[spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121) ~[spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at com.sun.proxy.$Proxy19.write(Unknown Source) ~[na:na]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:175) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:151) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:274) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:199) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:134) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_181]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
    at com.sun.proxy.$Proxy26.run(Unknown Source) [na:na]
    at com.stackoverflow.spring.launchers.Launcher.Create(Launcher.java:701) [classes/:na]
    at com.stackoverflow.spring.launchers.Launcher.main(Launcher.java:73) [classes/:na]
mikeTysan007
  • 1
  • 1
  • 3

2 Answers2

0

Just called the mehtod open() with argument new ExecutionContext() right before delegate.write(consolidatedList); to open the writer

open (new ExecutionContext ());
delegate.write(consolidatedList);
mikeTysan007
  • 1
  • 1
  • 3
0

Remove ItemStream (and its implementation) from ListUnpackingItemWriter and register flatWriter() as stream using .stream() so SB will manage flat-file stream automatically

Luca Basso Ricci
  • 17,829
  • 2
  • 47
  • 69