I am trying to make a Spring batch job in which
Reader reads one Item from Database
Processor creates List of Items
UnpackingItmeWriter accepts List of Items and sends individual Item to FlatFileItemWriter.
following is my code. I got this from this Answer .I am not doing something right in this configuration because job is not even starting when i run it. Please point me in right direction.
@EnableBatchProcessing
public class CreateFile extends ApplicationCommonConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
@StepScope
public JdbcCursorItemReader<ItemA> reader(
@Value("#{jobParameters}") Map<String, JobParameter> jobParameters) {
JdbcCursorItemReader<ItemA> reader = new JdbcCursorItemReader<ItemA>();
final String QUERY_SELECT = "SELECT * from table"
reader.setDataSource(dataSource);
reader.setSql(QUERY_SELECT);
reader.setRowMapper(new ItemeDeliveryRowMapper());
return reader;
}
public class ItemRowMapper implements RowMapper<ItemA> {
@Override
public ItemAmapRow(ResultSet rs, int rowNum) throws SQLException {
ItemA item= new ItemA();
item.setCode(rs.getString("Code"));
return item;
}
}
@Bean
@StepScope
public ItemProcessor<ItemA, List<ItemA>> processor(
@Value("#{jobParameters}") Map<String, JobParameter> jobParameters, @Value("#{stepExecution}")
StepExecution stepExecution) {
return new ItemAProcessor();
}
public class ItemAProcessor
implements ItemProcessor<ItemA, List<ItemA>> {
@Override
public List<ItemA> process(ItemA item) throws Exception {
List<Item> itemList = new ArrayList<ItemA>();
List<String> IdList = new ArrayList<>();
// creating List of ItemA from one ItemA
return itemList;
}
}
public class ListUnpackingItemWriter<ItemA> implements ItemWriter<List<ItemA>>, ItemStream, InitializingBean {
private ItemWriter<ItemA> delegate;
@Override
public void write(final List<? extends List<ItemA>> lists) throws Exception {
List<ItemA> consolidatedList = new ArrayList<>();
for ( List<ItemA> list : lists) {
consolidatedList.addAll(list);
}
delegate.write(consolidatedList);
}
@Override
public void afterPropertiesSet() {
Assert.notNull(delegate, "You must set a delegate!");
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).open(executionContext);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException{
if (delegate instanceof ItemStream) {
((ItemStream) delegate).update(executionContext);
}
}
@Override
public void close() {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).close();
}
}
public void setDelegate(ItemWriter<ItemA> delegate) {
this.delegate = delegate;
}
}
@Bean
@StepScope
public FlatFileItemWriter<ItemA> flatWriter() {
FlatFileItemWriter<ItemA> writer = new FlatFileItemWriter<ItemA>();
writer.setResource(new FileSystemResource("C:/dev/doc.txt"));
writer.setHeaderCallback(new FlatFileHeaderCallback() {
@Override
public void writeHeader(Writer writer) throws IOException {
writer.write("Header");
}
});
writer.setLineAggregator(new DelimitedLineAggregator<Item>() {
{
setDelimiter("|");
setFieldExtractor(new BeanWrapperFieldExtractor<Item>() {
{
setNames(new String[] { "Code" });
}
});
}
});
writer.setFooterCallback(new FlatFileFooterCallback() {
@Override
public void writeFooter(Writer writer) throws IOException {
writer.write("Footer");
}
});
return writer;
}
@Bean
@StepScope
public ItemWriter<List<ItemA>> writer(@Value("#{jobParameters}") Map<String, JobParameter> jobParameters) {
ListUnpackingItemWriter<ItemA> listUnpackingItemWriter = new ListUnpackingItemWriter<ItemA>();
listUnpackingItemWriter.setDelegate(flatWriter());
return listUnpackingItemWriter;
}
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<ItemA> reader,
ItemProcessor<ItemA, List<ItemA>> processor,
ItemWriter<List<ItemA>> flatItemWriter) {
return stepBuilderFactory.get("step1").<ItemA, List<ItemA>>chunk(1).reader(reader)
.processor(processor).writer(flatItemWriter).build();
}
@Bean
public Job createFileJob(JobBuilderFactory jobs, Step s1) {
return jobBuilderFactory.get("createFileJob").incrementer(new RunIdIncrementer())
.listener(listener()).flow(s1).end().build();
}
}
Edit: printstack
2018-11-11 22:17:53 [main] INFO c.t.t.l.Launcher - inside try
2018-11-11 22:17:54 [main] INFO c.t.t.c.CreateFile - inside JdbcCursorItemReader
2018-11-11 22:17:55 [main] ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step step1 in job createJob
org.springframework.batch.item.WriterNotOpenException: Writer must be open before it can be written to
at org.springframework.batch.item.file.FlatFileItemWriter.write(FlatFileItemWriter.java:255) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at com.stackoverflow.spring.config.CreateFile$ListUnpackingItemWriter.write(CreateFile.java:209) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_181]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133) ~[spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121) ~[spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at com.sun.proxy.$Proxy19.write(Unknown Source) ~[na:na]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:175) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:151) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:274) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:199) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:134) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_181]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_181]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) [spring-aop-4.0.5.RELEASE.jar:4.0.5.RELEASE]
at com.sun.proxy.$Proxy26.run(Unknown Source) [na:na]
at com.stackoverflow.spring.launchers.Launcher.Create(Launcher.java:701) [classes/:na]
at com.stackoverflow.spring.launchers.Launcher.main(Launcher.java:73) [classes/:na]