I have the following FlatFileItemWriter defined in a multi-threaded step.
public FlatFileItemWriter<School> writer() throws Exception {
FlatFileItemWriter<School> flatFileWriter = new FlatFileItemWriter<School>();
flatFileWriter.setResource(new FileSystemResource("C:\\u01\\SchoolDetails.txt"));
flatFileWriter.setName("School-File-Writer");
flatFileWriter.setAppendAllowed(true);
flatFileWriter.setLineSeparator("\n");
flatFileWriter.setHeaderCallback(writer -> writer.write(columnHeaders()));
flatFileWriter.setLineAggregator(new DelimitedLineAggregator<School>() {
{
setDelimiter("^");
setFieldExtractor((FieldExtractor<School>) schoolFieldExtractor());
}
});
return flatFileWriter;
}
private BeanWrapperFieldExtractor<School> schoolFieldExtractor() {
return new BeanWrapperFieldExtractor<School>() {
{
String[] columnValuesMapper = new String[] {
"schoolName", "schoolAddress"
};
setNames(columnValuesMapper);
}
};
}
The ItemWriter generates the files on most days. But once a while it throws the following error:
2022-02-14 22:07:46.652 [SimpleAsyncTaskExecutor-25] INFO SpringBatchConfiguration:703 - Item Reader
2022-02-14 22:07:46.653 [SimpleAsyncTaskExecutor-25] INFO PagingItemReader:80 - reading records 1 to 10
2022-02-14 22:07:46.657 [SimpleAsyncTaskExecutor-28] INFO PagingItemReader:80 - reading records 11 to 20
2022-02-14 22:07:46.661 [SimpleAsyncTaskExecutor-27] INFO PagingItemReader:80 - reading records 21 to 30
2022-02-14 22:07:46.665 [SimpleAsyncTaskExecutor-26] INFO PagingItemReader:80 - reading records 31 to 40
2022-02-14 22:07:46.998 [SimpleAsyncTaskExecutor-25] INFO o.s.batch.core.step.AbstractStep:272 - Step: [childStep:partition1] executed in 350ms
2022-02-14 22:07:47.005 [SimpleAsyncTaskExecutor-28] INFO o.s.batch.core.step.AbstractStep:272 - Step: [childStep:partition3] executed in 357ms
2022-02-14 22:07:47.033 [SimpleAsyncTaskExecutor-27] ERROR o.s.batch.core.step.AbstractStep:237 - Encountered an error executing step childStep in School-Job-Process
org.springframework.batch.item.ItemStreamException: Output file was not created: [/u01/TotalRecordsFound-20220214.txt]
at org.springframework.batch.item.util.FileUtils.setUpOutputFile(FileUtils.java:76)
at org.springframework.batch.item.support.AbstractFileItemWriter$OutputState.initializeBufferedWriter(AbstractFileItemWriter.java:553)
at org.springframework.batch.item.support.AbstractFileItemWriter$OutputState.access$000(AbstractFileItemWriter.java:385)
at org.springframework.batch.item.support.AbstractFileItemWriter.doOpen(AbstractFileItemWriter.java:319)
at org.springframework.batch.item.support.AbstractFileItemWriter.open(AbstractFileItemWriter.java:309)
at org.springframework.batch.item.support.AbstractFileItemWriter$$FastClassBySpringCGLIB$$f2d35c3.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at org.springframework.batch.item.file.FlatFileItemWriter$$EnhancerBySpringCGLIB$$294bdfee.open(<generated>)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:138)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:135)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
The error occurs intermittenly. The error occurs when two or more threads collides to create and write the data to the file. I can avoid it by delegating my FlatFileItemWriter to a SynchronizedItemStreamWriter. But the Spring docs suggest otherwise. The docs suggest that using a FlatFileItemWriter in a multi-threaded step does NOT require synchronizing writes.
So, I am not sure on how I can avoid these errors and also according to the logs the first two partitions successfully completed running which means the file is created successfully and data is written to it (if exists). So, how is the third partition telling that the file is not created when its already created by the first two paritions.
Any help would be appreciated. Thanks in advance.